This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5eebfdb0de4 Add CDC server push data record implementation and
refactor pipeline importer (#23168)
5eebfdb0de4 is described below
commit 5eebfdb0de4091e2dbcb8326bed976bceaec4763
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Jan 2 22:26:25 2023 +0800
Add CDC server push data record implementation and refactor pipeline
importer (#23168)
* Add CDC data record result implementation and refactor pipeline importer
* Fix codestyle and improved
* Add blob value
* Fix codestyle
---
.../api/config/ingest/DumperConfiguration.java | 2 +
.../pipeline/spi/importer/ImporterCreator.java | 4 +-
.../data/pipeline/spi/importer/ImporterType.java} | 23 +--
.../data/pipeline/cdc/client/CDCClient.java | 2 +-
.../client/handler/SubscriptionRequestHandler.java | 18 +-
.../client/parameter/StartCDCClientParameter.java | 2 +
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 34 +---
.../api/pojo/CreateSubscriptionJobParameter.java | 2 +
.../cdc/config/job/CDCJobConfiguration.java | 2 +
.../cdc/context/job/CDCJobItemContext.java | 15 +-
.../data/pipeline/cdc/core/ack/CDCAckHolder.java | 87 ++++++++++
.../ack/CDCAckPosition.java} | 30 ++--
.../pipeline/cdc/core/importer/CDCImporter.java | 71 +++++++-
.../cdc/core/importer/CDCImporterCreator.java | 5 +-
.../importer/connector/CDCImporterConnector.java | 189 ++++++++++++++++++++-
.../generator/DataRecordComparatorGenerator.java | 44 +++++
.../data/pipeline/cdc/util/CDCDataRecordUtil.java | 110 ++++++++++++
.../pipeline/cdc/util/ColumnValueConvertUtil.java | 161 ++++++++++++++++++
.../cdc/util/DataRecordResultConvertUtil.java | 64 +++++++
.../cdc/yaml/job/YamlCDCJobConfiguration.java | 2 +
.../yaml/job/YamlCDCJobConfigurationSwapper.java | 3 +-
.../pipeline/cdc/core/ack/CDCAckHolderTest.java | 64 +++++++
.../cdc/core/importer/CDCImporterCreatorTest.java | 7 +-
.../DataRecordComparatorGeneratorTest.java | 55 ++++++
.../pipeline/cdc/util/CDCDataRecordUtilTest.java | 71 ++++++++
.../cdc/util/ColumnValueConvertUtilTest.java | 100 +++++++++++
.../src/main/proto/CDCRequestProtocol.proto | 16 +-
.../src/main/proto/CDCResponseProtocol.proto | 19 ++-
.../core/importer/DataSourceImporterCreator.java | 3 +-
.../data/pipeline/core/record/RecordUtil.java | 19 +++
.../data/pipeline/core/task/IncrementalTask.java | 4 +-
.../data/pipeline/core/task/InventoryTask.java | 20 +--
.../OpenGaussIncrementalDumperCreator.java | 2 +-
.../opengauss/ingest/OpenGaussWALDumper.java | 4 +-
.../backend/handler/cdc/CDCBackendHandler.java | 61 +++++--
.../frontend/netty/CDCChannelInboundHandler.java | 32 +++-
.../core/fixture/FixtureImporterCreator.java | 3 +-
.../data/pipeline/core/record/RecordUtilTest.java | 21 ++-
38 files changed, 1236 insertions(+), 135 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index c21e22f3343..145828c2052 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -50,6 +50,8 @@ public class DumperConfiguration {
private TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ private boolean decodeWithTX;
+
/**
* Get logic table name.
*
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
index 38d2e8ba275..2e92ef36273 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
@@ -38,7 +38,9 @@ public interface ImporterCreator extends TypedSPI {
* @param importerConnector import connector
* @param channel channel
* @param jobProgressListener job progress listener
+ * @param importerType importer type
* @return importer
*/
- Importer createImporter(ImporterConfiguration importerConfig,
ImporterConnector importerConnector, PipelineChannel channel,
PipelineJobProgressListener jobProgressListener);
+ Importer createImporter(ImporterConfiguration importerConfig,
ImporterConnector importerConnector, PipelineChannel channel,
PipelineJobProgressListener jobProgressListener,
+ ImporterType importerType);
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
similarity index 59%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
index ba7a7e40b2c..7354da9e80c 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterType.java
@@ -15,27 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
-
-import io.netty.channel.Channel;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
/**
- * CDC importer connector.
+ * Importer type.
*/
-@RequiredArgsConstructor
-public final class CDCImporterConnector implements ImporterConnector {
-
- private final Channel channel;
+public enum ImporterType {
- @Override
- public Object getConnector() {
- return channel;
- }
+ INVENTORY,
- @Override
- public String getType() {
- return "CDC";
- }
+ INCREMENTAL
}
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 014fa995453..4f2acf1fd90 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -90,7 +90,7 @@ public final class CDCClient {
channel.pipeline().addLast(new ProtobufEncoder());
channel.pipeline().addLast(new
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
channel.pipeline().addLast(new
SubscriptionRequestHandler(parameter.getDatabase(),
parameter.getSubscriptionName(), parameter.getSubscribeTables(),
- parameter.getSubscriptionMode()));
+ parameter.getSubscriptionMode(),
parameter.isIncrementalGlobalOrderly()));
}
});
ChannelFuture future = bootstrap.connect(address, port).sync();
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
index e1aa325a31d..17855103394 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnect
import
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.client.event.CreateSubscriptionEvent;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
@@ -33,6 +34,8 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscr
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import java.util.List;
@@ -51,11 +54,13 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
private final SubscriptionMode subscribeMode;
+ private final boolean incrementalGlobalOrderly;
+
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final
Object evt) {
if (evt instanceof CreateSubscriptionEvent) {
CreateSubscriptionRequest createSubscriptionRequest =
CreateSubscriptionRequest.newBuilder().setDatabase(database).setSubscriptionMode(subscribeMode).setSubscriptionName(subscriptionName)
- .addAllTableNames(subscribeTables).build();
+
.addAllTableNames(subscribeTables).setIncrementalGlobalOrderly(incrementalGlobalOrderly).build();
CDCRequest request =
CDCRequest.newBuilder().setCreateSubscription(createSubscriptionRequest).setRequestId(RequestIdUtil.generateRequestId()).build();
ctx.writeAndFlush(request);
}
@@ -64,7 +69,7 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
CDCResponse response = (CDCResponse) msg;
- if (response.getStatus() == Status.FAILED) {
+ if (response.getStatus() != Status.SUCCEED) {
log.error("received error response {}", msg);
}
ClientConnectionContext connectionContext =
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
@@ -77,7 +82,7 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
} else if (connectionContext.getStatus() ==
ClientConnectionStatus.CREATING_SUBSCRIPTION) {
startSubscription(response, connectionContext);
} else {
- subscribeDataRecords(ctx);
+ subscribeDataRecords(ctx, response.getDataRecordResult());
}
}
@@ -94,8 +99,11 @@ public final class SubscriptionRequestHandler extends
ChannelInboundHandlerAdapt
connectionContext.setStatus(ClientConnectionStatus.SUBSCRIBING);
}
- private void subscribeDataRecords(final ChannelHandlerContext ctx) {
- // TODO to be implemented
+ private void subscribeDataRecords(final ChannelHandlerContext ctx, final
DataRecordResult result) {
+ List<Record> recordsList = result.getRecordsList();
+ log.debug("received records {}", recordsList);
+ // TODO data needs to be processed, such as writing to a database
+
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setAckRequest(AckRequest.newBuilder().setAckId(result.getAckId()).build()).build());
}
@Override
diff --git
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
index f18bcbd6162..90d0ab31c6e 100644
---
a/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++
b/kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
@@ -46,4 +46,6 @@ public final class StartCDCClientParameter {
private String subscriptionName;
private SubscriptionMode subscriptionMode = SubscriptionMode.INCREMENTAL;
+
+ private boolean incrementalGlobalOrderly;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bc52ce5bd3b..336a3cc75c2 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -28,7 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
@@ -36,9 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
@@ -84,7 +81,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -112,6 +108,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
yamlJobConfig.setTableNames(event.getSubscribeTableNames());
yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+ yamlJobConfig.setDecodeWithTX(event.isDecodeWithTX());
ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
List<JobDataNodeLine> jobDataNodeLines =
JobDataNodeLineConvertUtil.convertDataNodesToLines(event.getDataNodesMap());
@@ -182,21 +179,22 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(Collections.emptyMap());
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfiguration
=
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(), dataSourceName,
actualDataSourceConfiguration, tableNameMap, tableNameSchemaNameMapping);
+ DumperConfiguration dumperConfig = buildDumperConfiguration(jobConfig,
dataSourceName, actualDataSourceConfiguration, tableNameMap,
tableNameSchemaNameMapping);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getTableNames(), tableNameSchemaNameMapping);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperConfig,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
}
- private static DumperConfiguration buildDumperConfiguration(final String
jobId, final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSourceConfig,
+ private static DumperConfiguration buildDumperConfiguration(final
CDCJobConfiguration jobConfig, final String dataSourceName, final
PipelineDataSourceConfiguration sourceDataSourceConfig,
final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
DumperConfiguration result = new DumperConfiguration();
- result.setJobId(jobId);
+ result.setJobId(jobConfig.getJobId());
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(sourceDataSourceConfig);
result.setTableNameMap(tableNameMap);
result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+ result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
}
@@ -232,31 +230,15 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
return new
YamlCDCJobConfigurationSwapper().swapToYamlConfiguration((CDCJobConfiguration)
jobConfig);
}
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- // TODO to be implemented
- }
-
- @Override
- public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
- // TODO to be implemented
- return Optional.empty();
- }
-
- @Override
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- // TODO to be implemented
- }
-
@Override
protected PipelineJobInfo getJobInfo(final String jobId) {
- // TODO to be implemented
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public void rollback(final String jobId) throws SQLException {
- // TODO to be implemented
+ stop(jobId);
+ dropJob(jobId);
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
index 02ddd7cb2f6..79c30206e54 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
@@ -40,4 +40,6 @@ public final class CreateSubscriptionJobParameter {
private final String subscriptionMode;
private final Map<String, List<DataNode>> dataNodesMap;
+
+ private final boolean decodeWithTX;
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
index de4358df453..d4c628ac874 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java
@@ -50,6 +50,8 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
private final List<JobDataNodeLine> jobShardingDataNodes;
+ private final boolean decodeWithTX;
+
private final int concurrency;
private final int retryTimes;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index c2702337ec6..8d63276264e 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
@@ -40,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterCo
import java.util.Collection;
import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
/**
* CDC job item context.
@@ -72,6 +74,10 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
private final Collection<IncrementalTask> incrementalTasks = new
LinkedList<>();
+ private final AtomicLong processedRecordsCount = new AtomicLong(0);
+
+ private final AtomicLong inventoryRecordsCount = new AtomicLong(0);
+
private final LazyInitializer<PipelineDataSourceWrapper>
sourceDataSourceLazyInitializer = new
LazyInitializer<PipelineDataSourceWrapper>() {
@Override
@@ -100,7 +106,8 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
@Override
public void onProgressUpdated(final PipelineJobProgressUpdatedParameter
param) {
- // TODO to be implemented
+ processedRecordsCount.addAndGet(param.getProcessedRecordsCount());
+ PipelineJobProgressPersistService.notifyPersist(jobConfig.getJobId(),
shardingItem);
}
/**
@@ -126,16 +133,16 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
@Override
public long getProcessedRecordsCount() {
- throw new UnsupportedOperationException();
+ return processedRecordsCount.get();
}
@Override
public void updateInventoryRecordsCount(final long recordsCount) {
- // TODO to be implemented
+ inventoryRecordsCount.addAndGet(recordsCount);
}
@Override
public long getInventoryRecordsCount() {
- return 0;
+ return inventoryRecordsCount.get();
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
new file mode 100644
index 00000000000..134027c25c5
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * CDC ack holder.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCAckHolder {
+
+ private static final CDCAckHolder INSTANCE = new CDCAckHolder();
+
+ private final Map<String, Map<CDCImporter, CDCAckPosition>>
ackIdImporterMap = new ConcurrentHashMap<>();
+
+ /**
+ * the ack of CDC.
+ *
+ * @param ackId ack id
+ */
+ public void ack(final String ackId) {
+ Map<CDCImporter, CDCAckPosition> importerDataRecordMap =
ackIdImporterMap.remove(ackId);
+ if (null != importerDataRecordMap) {
+ importerDataRecordMap.forEach(CDCImporter::ackWithLastDataRecord);
+ }
+ }
+
+ /**
+ * Bind ack id.
+ *
+ * @param importerDataRecordMap import data record map
+ * @return ack id
+ */
+ public String bindAckIdWithPosition(final Map<CDCImporter, CDCAckPosition>
importerDataRecordMap) {
+ String result = generateAckId();
+ // TODO it's might need to persist to registry center in cluster mode.
+ ackIdImporterMap.put(result, importerDataRecordMap);
+ return result;
+ }
+
+ private String generateAckId() {
+ return "ACK-" + UUID.randomUUID();
+ }
+
+ /**
+ * Clean up.
+ *
+ * @param cdcImporter CDC importer
+ */
+ public void cleanUp(final CDCImporter cdcImporter) {
+ if (ackIdImporterMap.isEmpty()) {
+ return;
+ }
+ ackIdImporterMap.entrySet().removeIf(entry ->
entry.getValue().containsKey(cdcImporter));
+ }
+
+ /**
+ * Get instance.
+ *
+ * @return CDC ack holder
+ */
+ public static CDCAckHolder getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
similarity index 60%
copy from
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
copy to
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index 02ddd7cb2f6..469aa3870bb 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/pojo/CreateSubscriptionJobParameter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -15,29 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.cdc.api.pojo;
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+import lombok.AllArgsConstructor;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.datanode.DataNode;
-
-import java.util.List;
-import java.util.Map;
+import lombok.Setter;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
/**
- * Create subscription job parameter.
+ * CDC ack position.
*/
-@RequiredArgsConstructor
@Getter
-public final class CreateSubscriptionJobParameter {
-
- private final String database;
+@AllArgsConstructor
+public final class CDCAckPosition {
- private final List<String> subscribeTableNames;
+ @Setter
+ private Record lastRecord;
- private final String subscriptionName;
+ @Setter
+ private int dataRecordCount;
- private final String subscriptionMode;
+ private final long createTimeMills;
- private final Map<String, List<DataNode>> dataNodesMap;
+ public CDCAckPosition(final Record lastRecord, final int dataRecordCount) {
+ this(lastRecord, dataRecordCount, System.currentTimeMillis());
+ }
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index e9fd10e3ac4..5b7f7c08272 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -19,17 +19,33 @@ package
org.apache.shardingsphere.data.pipeline.cdc.core.importer;
import lombok.AccessLevel;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
* CDC importer.
*/
+@Slf4j
public final class CDCImporter extends AbstractLifecycleExecutor implements
Importer {
@Getter(AccessLevel.PROTECTED)
@@ -37,24 +53,71 @@ public final class CDCImporter extends
AbstractLifecycleExecutor implements Impo
private final PipelineChannel channel;
+ private final CDCImporterConnector importerConnector;
+
private final PipelineJobProgressListener jobProgressListener;
+ @Getter
+ private final ImporterType importerType;
+
private final JobRateLimitAlgorithm rateLimitAlgorithm;
- public CDCImporter(final ImporterConfiguration importerConfig, final
ImporterConnector importerConnector, final PipelineChannel channel, final
PipelineJobProgressListener jobProgressListener) {
+ public CDCImporter(final ImporterConfiguration importerConfig, final
ImporterConnector importerConnector, final PipelineChannel channel, final
PipelineJobProgressListener jobProgressListener,
+ final ImporterType importerType) {
this.importerConfig = importerConfig;
- rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
+ rateLimitAlgorithm = null == importerConfig ? null :
importerConfig.getRateLimitAlgorithm();
this.channel = channel;
+ this.importerConnector = (CDCImporterConnector) importerConnector;
this.jobProgressListener = jobProgressListener;
+ this.importerType = importerType;
}
@Override
protected void runBlocking() {
- // TODO to be implemented
+ int batchSize = importerConfig.getBatchSize();
+ if (ImporterType.INCREMENTAL == importerType) {
+ importerConnector.sendIncrementalStartEvent(this, batchSize);
+ }
+ while (isRunning()) {
+ List<Record> records = channel.fetchRecords(batchSize, 3);
+ if (null != records && !records.isEmpty()) {
+ List<Record> recordList = records.stream().filter(each ->
!(each instanceof PlaceholderRecord)).collect(Collectors.toList());
+ try {
+ processDataRecords(recordList);
+ } catch (final SQLException ex) {
+ log.error("process data records failed", ex);
+ throw new RuntimeException(ex);
+ }
+ if (FinishedRecord.class.equals(records.get(records.size() -
1).getClass())) {
+ break;
+ }
+ }
+ }
+ }
+
+ private void processDataRecords(final List<Record> recordList) throws
SQLException {
+ if (null == recordList || recordList.isEmpty()) {
+ return;
+ }
+ if (null != rateLimitAlgorithm) {
+ rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
+ }
+ importerConnector.write(recordList, this, importerType);
+ }
+
+ /**
+ * Ack with last data record.
+ *
+ * @param cdcAckPosition cdc ack position
+ */
+ public void ackWithLastDataRecord(final CDCAckPosition cdcAckPosition) {
+ channel.ack(Collections.singletonList(cdcAckPosition.getLastRecord()));
+ jobProgressListener.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(cdcAckPosition.getDataRecordCount()));
}
@Override
protected void doStop() {
- // TODO to be implemented
+ importerConnector.clean();
+ CDCAckHolder.getInstance().cleanUp(this);
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
index d27023370aa..9f1034ef0eb 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreator.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
/**
@@ -31,8 +32,8 @@ public final class CDCImporterCreator implements
ImporterCreator {
@Override
public Importer createImporter(final ImporterConfiguration importerConfig,
final ImporterConnector importerConnector, final PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
- return new CDCImporter(importerConfig, importerConnector, channel,
jobProgressListener);
+ final PipelineJobProgressListener
jobProgressListener, final ImporterType importerType) {
+ return new CDCImporter(importerConfig, importerConnector, channel,
jobProgressListener, importerType);
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
index ba7a7e40b2c..257f2324d71 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/connector/CDCImporterConnector.java
@@ -18,24 +18,211 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector;
import io.netty.channel.Channel;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataRecordUtil;
+import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertUtil;
+import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
+import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* CDC importer connector.
*/
-@RequiredArgsConstructor
+@Slf4j
public final class CDCImporterConnector implements ImporterConnector {
+ private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition condition = lock.newCondition();
+
+ @Setter
+ private volatile boolean running = true;
+
+ @Getter
+ private final String database;
+
private final Channel channel;
+ private final int jobShardingCount;
+
+ private final Comparator<DataRecord> dataRecordComparator;
+
+ private final Map<String, String> tableNameSchemaMap = new HashMap<>();
+
+ private final Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap
= new ConcurrentHashMap<>();
+
+ private final AtomicInteger runningIncrementalTaskCount = new
AtomicInteger(0);
+
+ private Thread incrementalImporterTask;
+
+ public CDCImporterConnector(final Channel channel, final String database,
final int jobShardingCount, final List<String> tableNames, final
Comparator<DataRecord> dataRecordComparator) {
+ this.channel = channel;
+ this.database = database;
+ this.jobShardingCount = jobShardingCount;
+ tableNames.stream().filter(each -> each.contains(".")).forEach(each ->
{
+ String[] split = each.split("\\.");
+ tableNameSchemaMap.put(split[0], split[1]);
+ });
+ this.dataRecordComparator = dataRecordComparator;
+ }
+
@Override
public Object getConnector() {
return channel;
}
+ /**
+ * Write data record into channel.
+ *
+ * @param recordList data records
+ * @param cdcImporter cdc importer
+ * @param importerType importer type
+ */
+ public void write(final List<Record> recordList, final CDCImporter
cdcImporter, final ImporterType importerType) {
+ if (recordList.isEmpty()) {
+ return;
+ }
+ if (ImporterType.INVENTORY == importerType || null ==
dataRecordComparator) {
+ Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new
HashMap<>();
+ int dataRecordCount = (int) recordList.stream().filter(each ->
each instanceof DataRecord).count();
+ Record lastRecord = recordList.get(recordList.size() - 1);
+ if (lastRecord instanceof FinishedRecord && 0 == dataRecordCount) {
+ cdcImporter.ackWithLastDataRecord(new
CDCAckPosition(lastRecord, 0));
+ return;
+ }
+ importerDataRecordMap.put(cdcImporter, new
CDCAckPosition(RecordUtil.getLastNormalRecord(recordList), dataRecordCount));
+ writeImmediately(recordList, importerDataRecordMap);
+ } else if (ImporterType.INCREMENTAL == importerType) {
+ writeIntoQueue(recordList, cdcImporter);
+ }
+ }
+
+ private void writeImmediately(final List<? extends Record> recordList,
final Map<CDCImporter, CDCAckPosition> importerDataRecordMap) {
+ while (!channel.isWritable() && channel.isActive() && running) {
+ doAwait();
+ }
+ if (!channel.isActive() || !running) {
+ return;
+ }
+ List<DataRecordResult.Record> records = new LinkedList<>();
+ for (Record each : recordList) {
+ if (!(each instanceof DataRecord)) {
+ continue;
+ }
+ DataRecord dataRecord = (DataRecord) each;
+
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database,
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+ }
+ String ackId =
CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
+ DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecords(records).setAckId(ackId).build();
+
channel.writeAndFlush(CDCResponseGenerator.succeedBuilder("").setDataRecordResult(dataRecordResult).build());
+ }
+
+ private void doAwait() {
+ lock.lock();
+ try {
+ condition.await(DEFAULT_TIMEOUT_MILLISECONDS,
TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException ignored) {
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void writeIntoQueue(final List<Record> dataRecords, final
CDCImporter cdcImporter) {
+ BlockingQueue<Record> blockingQueue =
incrementalRecordMap.get(cdcImporter);
+ if (null == blockingQueue) {
+ log.warn("not find the queue to write");
+ return;
+ }
+ for (Record each : dataRecords) {
+ blockingQueue.put(each);
+ }
+ }
+
+ /**
+ * Send finished record event.
+ *
+ * @param cdcImporter cdc importer
+ * @param batchSize batch size
+ */
+ public void sendIncrementalStartEvent(final CDCImporter cdcImporter, final
int batchSize) {
+ incrementalRecordMap.computeIfAbsent(cdcImporter, ignored -> new
ArrayBlockingQueue<>(batchSize));
+ int count = runningIncrementalTaskCount.incrementAndGet();
+ if (count < jobShardingCount || null == dataRecordComparator) {
+ return;
+ }
+ log.debug("start CDC incremental importer");
+ if (null == incrementalImporterTask) {
+ incrementalImporterTask = new Thread(new
CDCIncrementalImporterTask(batchSize));
+ incrementalImporterTask.start();
+ }
+ }
+
+ /**
+ * Clean CDC importer connector.
+ */
+ public void clean() {
+ running = false;
+ incrementalRecordMap.clear();
+ }
+
@Override
public String getType() {
return "CDC";
}
+
+ @RequiredArgsConstructor
+ private final class CDCIncrementalImporterTask implements Runnable {
+
+ private final int batchSize;
+
+ @Override
+ public void run() {
+ while (running) {
+ Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new
HashMap<>();
+ List<DataRecord> dataRecords = new LinkedList<>();
+ for (int i = 0; i < batchSize; i++) {
+ DataRecord minimumDataRecord =
CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(incrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
+ if (null == minimumDataRecord) {
+ break;
+ }
+ dataRecords.add(minimumDataRecord);
+ }
+ if (dataRecords.isEmpty()) {
+ ThreadUtil.sleep(200, TimeUnit.MILLISECONDS);
+ } else {
+ writeImmediately(dataRecords, cdcAckPositionMap);
+ }
+ }
+ }
+ }
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
new file mode 100644
index 00000000000..7c467e0b910
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGenerator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+
+import java.util.Comparator;
+
+/**
+ * Data record comparator generator.
+ */
+public final class DataRecordComparatorGenerator {
+
+ /**
+ * Generator comparator.
+ *
+ * @param databaseType database type
+ * @return data record comparator
+ */
+ public static Comparator<DataRecord> generatorIncrementalComparator(final
DatabaseType databaseType) {
+ if (databaseType instanceof OpenGaussDatabaseType) {
+ return Comparator.comparing(DataRecord::getCsn,
Comparator.nullsFirst(Comparator.naturalOrder()));
+ }
+ // TODO MySQL and PostgreSQL not support now
+ return null;
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
new file mode 100644
index 00000000000..295d03cabfe
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * CDC data record util.
+ */
+public final class CDCDataRecordUtil {
+
+ /**
+ * Find minimum data record and save position.
+ *
+ * @param incrementalRecordMap CDC ack position map.
+ * @param dataRecordComparator CDC ack position map.
+ * @param cdcAckPositionMap CDC ack position map.
+ * @return minimum data record
+ */
+ public static DataRecord findMinimumDataRecordAndSavePosition(final
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final
Comparator<DataRecord> dataRecordComparator,
+ final
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+ if (null == dataRecordComparator) {
+ return
findMinimumDataRecordWithoutComparator(incrementalRecordMap, cdcAckPositionMap);
+ } else {
+ return findMinimumDataRecordWithComparator(incrementalRecordMap,
cdcAckPositionMap, dataRecordComparator);
+ }
+ }
+
+ private static DataRecord findMinimumDataRecordWithoutComparator(final
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap) {
+ for (Entry<CDCImporter, BlockingQueue<Record>> entry :
incrementalRecordMap.entrySet()) {
+ Record record = entry.getValue().poll();
+ if (!(record instanceof DataRecord)) {
+ continue;
+ }
+ saveAckPosition(cdcAckPositionMap, entry.getKey(), record);
+ return (DataRecord) record;
+ }
+ return null;
+ }
+
+ private static void saveAckPosition(final Map<CDCImporter, CDCAckPosition>
cdcAckPositionMap, final CDCImporter cdcImporter, final Record record) {
+ CDCAckPosition cdcAckPosition = cdcAckPositionMap.get(cdcImporter);
+ if (null == cdcAckPosition) {
+ cdcAckPositionMap.put(cdcImporter, new CDCAckPosition(record, 1));
+ } else {
+ cdcAckPosition.setLastRecord(record);
+
cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
+ }
+ }
+
+ private static DataRecord findMinimumDataRecordWithComparator(final
Map<CDCImporter, BlockingQueue<Record>> incrementalRecordMap, final
Map<CDCImporter, CDCAckPosition> cdcAckPositionMap,
+ final
Comparator<DataRecord> dataRecordComparator) {
+ Map<CDCImporter, DataRecord> waitSortedMap = new HashMap<>();
+ for (Entry<CDCImporter, BlockingQueue<Record>> entry :
incrementalRecordMap.entrySet()) {
+ Record peek = entry.getValue().peek();
+ if (null == peek) {
+ continue;
+ }
+ if (peek instanceof DataRecord) {
+ waitSortedMap.put(entry.getKey(), (DataRecord) peek);
+ }
+ }
+ if (waitSortedMap.isEmpty()) {
+ return null;
+ }
+ DataRecord minRecord = null;
+ CDCImporter belongImporter = null;
+ for (Entry<CDCImporter, DataRecord> entry : waitSortedMap.entrySet()) {
+ if (null == minRecord) {
+ minRecord = entry.getValue();
+ belongImporter = entry.getKey();
+ continue;
+ }
+ if (dataRecordComparator.compare(minRecord, entry.getValue()) > 0)
{
+ minRecord = entry.getValue();
+ belongImporter = entry.getKey();
+ }
+ }
+ if (null == minRecord) {
+ return null;
+ }
+ incrementalRecordMap.get(belongImporter).poll();
+ saveAckPosition(cdcAckPositionMap, belongImporter, minRecord);
+ return minRecord;
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
new file mode 100644
index 00000000000..c62c9327748
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtil.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.gson.Gson;
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Message.Builder;
+import com.google.protobuf.StringValue;
+import com.google.protobuf.Struct;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BlobValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ClobValue;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZonedDateTime;
+import java.util.Date;
+
+/**
+ * Column value convert util.
+ */
+@Slf4j
+public final class ColumnValueConvertUtil {
+
+ private static final Gson GSON = new Gson();
+
+ /**
+ * Convert java object to protobuf message.
+ *
+ * @param object object
+ * @return protobuf message
+ */
+ public static Message convertToProtobufMessage(final Object object) {
+ if (null == object) {
+ return NullValue.newBuilder().build();
+ }
+ if (object instanceof Integer) {
+ return Int32Value.newBuilder().setValue((int) object).build();
+ }
+ if (object instanceof Short) {
+ return Int32Value.newBuilder().setValue(((Short)
object).intValue()).build();
+ }
+ if (object instanceof Byte) {
+ return Int32Value.newBuilder().setValue(((Byte)
object).intValue()).build();
+ }
+ if (object instanceof Long) {
+ return Int64Value.newBuilder().setValue((long) object).build();
+ }
+ if (object instanceof BigInteger) {
+ return
BigIntegerValue.newBuilder().setValue(ByteString.copyFrom(((BigInteger)
object).toByteArray())).build();
+ }
+ if (object instanceof Float) {
+ return FloatValue.newBuilder().setValue((float) object).build();
+ }
+ if (object instanceof Double) {
+ return DoubleValue.newBuilder().setValue((double) object).build();
+ }
+ if (object instanceof BigDecimal) {
+ return
BigDecimalValue.newBuilder().setValue(object.toString()).build();
+ }
+ if (object instanceof String) {
+ return
StringValue.newBuilder().setValue(object.toString()).build();
+ }
+ if (object instanceof Boolean) {
+ return BoolValue.newBuilder().setValue((boolean) object).build();
+ }
+ if (object instanceof byte[]) {
+ return
BytesValue.newBuilder().setValue(ByteString.copyFrom((byte[]) object)).build();
+ }
+ if (object instanceof Date) {
+ return converToProtobufTimestamp((Date) object);
+ }
+ if (object instanceof LocalDateTime) {
+ return converToProtobufTimestamp(Timestamp.valueOf((LocalDateTime)
object));
+ }
+ if (object instanceof LocalDate) {
+ return converToProtobufTimestamp(Timestamp.valueOf(((LocalDate)
object).atStartOfDay()));
+ }
+ if (object instanceof LocalTime) {
+ LocalTime localTime = (LocalTime) object;
+ return
LocalTimeValue.newBuilder().setValue(localTime.toString()).build();
+ }
+ if (object instanceof ZonedDateTime) {
+ return
converToProtobufTimestamp(Timestamp.valueOf(((ZonedDateTime)
object).toLocalDateTime()));
+ }
+ if (object instanceof Instant) {
+ Instant instant = (Instant) object;
+ return
com.google.protobuf.Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).setNanos(instant.getNano()).build();
+ }
+ if (object instanceof Clob) {
+ Clob clob = (Clob) object;
+ try {
+ return ClobValue.newBuilder().setValue(clob.getSubString(1,
(int) clob.length())).build();
+ } catch (final SQLException ex) {
+ log.error("get clob length failed", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ if (object instanceof Blob) {
+ Blob blob = (Blob) object;
+ try {
+ return
BlobValue.newBuilder().setValue(ByteString.copyFrom(blob.getBytes(1, (int)
blob.length()))).build();
+ } catch (final SQLException ex) {
+ log.error("get blob bytes failed", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+ return fromJson(GSON.toJson(object));
+ }
+
+ private static com.google.protobuf.Timestamp
converToProtobufTimestamp(final Date timestamp) {
+ long millis = timestamp.getTime();
+ return com.google.protobuf.Timestamp.newBuilder().setSeconds(millis /
1000).setNanos((int) ((millis % 1000) * 1000000)).build();
+ }
+
+ private static Message fromJson(final String json) {
+ Builder structBuilder = Struct.newBuilder();
+ try {
+ JsonFormat.parser().ignoringUnknownFields().merge(json,
structBuilder);
+ } catch (final InvalidProtocolBufferException ex) {
+ throw new RuntimeException(ex);
+ }
+ return structBuilder.build();
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
new file mode 100644
index 00000000000..9108ed6af2e
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtil.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.Any;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.DataChangeType;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Data record result convert util.
+ */
+public final class DataRecordResultConvertUtil {
+
+ /**
+ * Convert data record to record.
+ *
+ * @param database database
+ * @param schema schema
+ * @param dataRecord data record
+ * @return record
+ */
+ public static Record convertDataRecordToRecord(final String database,
final String schema, final DataRecord dataRecord) {
+ Map<String, Any> beforeMap = new HashMap<>();
+ Map<String, Any> afterMap = new HashMap<>();
+ for (Column column : dataRecord.getColumns()) {
+ beforeMap.put(column.getName(),
Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getOldValue())));
+ afterMap.put(column.getName(),
Any.pack(ColumnValueConvertUtil.convertToProtobufMessage(column.getValue())));
+ }
+ TableMetaData metaData =
TableMetaData.newBuilder().setDatabase(database).setSchema(Strings.nullToEmpty(schema)).setTableName(dataRecord.getTableName()).build();
+ DataChangeType dataChangeType = DataChangeType.UNKNOWN;
+ if (IngestDataChangeType.INSERT.equals(dataRecord.getType())) {
+ dataChangeType = DataChangeType.INSERT;
+ } else if (IngestDataChangeType.UPDATE.equals(dataRecord.getType())) {
+ dataChangeType = DataChangeType.UPDATE;
+ } else if (IngestDataChangeType.DELETE.equals(dataRecord.getType())) {
+ dataChangeType = DataChangeType.DELETE;
+ }
+ return
DataRecordResult.Record.newBuilder().setTableMetaData(metaData).putAllBefore(beforeMap).putAllAfter(afterMap).setDataChangeType(dataChangeType).build();
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
index 28c38a814a7..9a02e7b853e 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfiguration.java
@@ -49,6 +49,8 @@ public final class YamlCDCJobConfiguration implements
YamlPipelineJobConfigurati
private List<String> jobShardingDataNodes;
+ private boolean decodeWithTX;
+
private int concurrency = 1;
private int retryTimes;
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
index 106f9b9ab2f..14276bbed7b 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/yaml/job/YamlCDCJobConfigurationSwapper.java
@@ -48,6 +48,7 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
result.setTablesFirstDataNodes(null == data.getTablesFirstDataNodes()
? null : data.getTablesFirstDataNodes().marshal());
List<String> jobShardingDataNodes = null ==
data.getJobShardingDataNodes() ? null :
data.getJobShardingDataNodes().stream().map(JobDataNodeLine::marshal).collect(Collectors.toList());
result.setJobShardingDataNodes(jobShardingDataNodes);
+ result.setDecodeWithTX(data.isDecodeWithTX());
result.setConcurrency(data.getConcurrency());
result.setRetryTimes(0);
return result;
@@ -61,7 +62,7 @@ public final class YamlCDCJobConfigurationSwapper implements
YamlConfigurationSw
JobDataNodeLine tablesFirstDataNodes = null ==
yamlConfig.getTablesFirstDataNodes() ? null :
JobDataNodeLine.unmarshal(yamlConfig.getTablesFirstDataNodes());
return new CDCJobConfiguration(yamlConfig.getJobId(),
yamlConfig.getDatabase(), yamlConfig.getTableNames(),
yamlConfig.getSubscriptionName(), yamlConfig.getSubscriptionMode(),
yamlConfig.getSourceDatabaseType(),
(ShardingSpherePipelineDataSourceConfiguration)
dataSourceConfigSwapper.swapToObject(yamlConfig.getDataSourceConfiguration()),
tablesFirstDataNodes,
- jobShardingDataNodes, yamlConfig.getConcurrency(),
yamlConfig.getRetryTimes());
+ jobShardingDataNodes, yamlConfig.isDecodeWithTX(),
yamlConfig.getConcurrency(), yamlConfig.getRetryTimes());
}
/**
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
new file mode 100644
index 00000000000..bc15f22efe3
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
+
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public final class CDCAckHolderTest {
+
+ @Test
+ public void assertBindAckIdWithPositionAndAck() {
+ CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
+ final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new
HashMap<>();
+ CDCImporter cdcImporter = mock(CDCImporter.class);
+ importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new
FinishedRecord(new FinishedPosition()), 0));
+ Optional<Map<String, Map<CDCImporter, CDCAckPosition>>>
ackIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder,
"ackIdImporterMap");
+ assertTrue(ackIdImporterMap.isPresent());
+ assertTrue(ackIdImporterMap.get().isEmpty());
+ String ackId =
cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
+ assertThat(ackIdImporterMap.get().size(), is(1));
+ cdcAckHolder.ack(ackId);
+ assertTrue(ackIdImporterMap.get().isEmpty());
+ }
+
+ @Test
+ public void assertCleanUpTimeoutAckId() {
+ CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
+ final Map<CDCImporter, CDCAckPosition> importerDataRecordMap = new
HashMap<>();
+ CDCImporter cdcImporter = mock(CDCImporter.class);
+ importerDataRecordMap.put(cdcImporter, new CDCAckPosition(new
FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 *
1000 * 10));
+ cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
+ cdcAckHolder.cleanUp(cdcImporter);
+ Optional<Map<String, Map<CDCImporter, CDCAckPosition>>>
actualAckIdImporterMap = ReflectionUtil.getFieldValue(cdcAckHolder,
"ackIdImporterMap");
+ assertTrue(actualAckIdImporterMap.isPresent());
+ assertTrue(actualAckIdImporterMap.get().isEmpty());
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
index 811ed83d94c..bebcbaef4a0 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporterCreatorTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.importer;
+import io.netty.channel.Channel;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
@@ -26,8 +27,11 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.Collections;
+
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public final class CDCImporterCreatorTest {
@@ -37,6 +41,7 @@ public final class CDCImporterCreatorTest {
@Test
public void assertCreateCDCImporter() {
-
assertThat(TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
"CDC").createImporter(importerConfig, new CDCImporterConnector(null), null,
null), instanceOf(CDCImporter.class));
+ CDCImporterConnector importerConnector = new
CDCImporterConnector(mock(Channel.class), "test", 1, Collections.emptyList(),
null);
+
assertThat(TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
"CDC").createImporter(importerConfig, importerConnector, null, null, null),
instanceOf(CDCImporter.class));
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
new file mode 100644
index 00000000000..b7248148f23
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/generator/DataRecordComparatorGeneratorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.generator;
+
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNull;
+
+public final class DataRecordComparatorGeneratorTest {
+
+ @Test
+ public void assertGeneratorIncrementalComparator() {
+ Comparator<DataRecord> dataRecordComparator =
DataRecordComparatorGenerator.generatorIncrementalComparator(new
OpenGaussDatabaseType());
+ List<DataRecord> dataRecords = new LinkedList<>();
+ dataRecords.add(generateDataRecord(1L));
+ dataRecords.add(generateDataRecord(100L));
+ dataRecords.add(generateDataRecord(0L));
+ dataRecords.add(generateDataRecord(null));
+ dataRecords.sort(dataRecordComparator);
+ assertNull(dataRecords.get(0).getCsn());
+ assertThat(dataRecords.get(1).getCsn(), is(0L));
+ assertThat(dataRecords.get(2).getCsn(), is(1L));
+ assertThat(dataRecords.get(3).getCsn(), is(100L));
+ }
+
+ private DataRecord generateDataRecord(final Long csn) {
+ DataRecord result = new DataRecord(new PlaceholderPosition(), 0);
+ result.setCsn(csn);
+ return result;
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
new file mode 100644
index 00000000000..658d8fcdfc3
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtilTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
+import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
+import
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import org.junit.Test;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+public final class CDCDataRecordUtilTest {
+
+ @Test
+ public void assertFindMinimumDataRecordAndSavePosition() throws
InterruptedException {
+ final Map<CDCImporter, BlockingQueue<Record>>
actualIncrementalRecordMap = new HashMap<>();
+ ArrayBlockingQueue<Record> queueFirst = new ArrayBlockingQueue<>(5);
+ queueFirst.put(generateDataRecord(0));
+ queueFirst.put(generateDataRecord(2));
+ queueFirst.put(generateDataRecord(4));
+ CDCImporter mockCdcImporterFirst = mock(CDCImporter.class);
+ actualIncrementalRecordMap.put(mockCdcImporterFirst, queueFirst);
+ ArrayBlockingQueue<Record> queueSecond = new ArrayBlockingQueue<>(5);
+ queueSecond.put(generateDataRecord(1));
+ queueSecond.put(generateDataRecord(3));
+ queueSecond.put(generateDataRecord(5));
+ CDCImporter mockCdcImporterSecond = mock(CDCImporter.class);
+ actualIncrementalRecordMap.put(mockCdcImporterSecond, queueSecond);
+ Comparator<DataRecord> dataRecordComparator =
DataRecordComparatorGenerator.generatorIncrementalComparator(new
OpenGaussDatabaseType());
+ final Map<CDCImporter, CDCAckPosition> cdcAckPositionMap = new
HashMap<>();
+ for (long i = 0; i <= 5; i++) {
+ DataRecord minimumDataRecord =
CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap);
+ assertThat(minimumDataRecord.getCsn(), is(i));
+ }
+
assertNull(CDCDataRecordUtil.findMinimumDataRecordAndSavePosition(actualIncrementalRecordMap,
dataRecordComparator, cdcAckPositionMap));
+ }
+
+ private DataRecord generateDataRecord(final long csn) {
+ DataRecord dataRecord = new DataRecord(new PlaceholderPosition(), 0);
+ dataRecord.setCsn(csn);
+ return dataRecord;
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
new file mode 100644
index 00000000000..bf5dbf1266b
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/ColumnValueConvertUtilTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.protobuf.BoolValue;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.DoubleValue;
+import com.google.protobuf.FloatValue;
+import com.google.protobuf.Int32Value;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.StringValue;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigDecimalValue;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.BigIntegerValue;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.LocalTimeValue;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.NullValue;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertTrue;
+
+public final class ColumnValueConvertUtilTest {
+
+ @Test
+ public void assertConvertToProtobufMessage() {
+ Message actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage(null);
+ assertTrue(actualMessage instanceof NullValue);
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1);
+ assertTrue(actualMessage instanceof Int32Value);
+ assertThat(((Int32Value) actualMessage).getValue(), is(1));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage((byte)
1);
+ assertTrue(actualMessage instanceof Int32Value);
+ assertThat(((Int32Value) actualMessage).getValue(), is(1));
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage((short) 1);
+ assertTrue(actualMessage instanceof Int32Value);
+ assertThat(((Int32Value) actualMessage).getValue(), is(1));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1L);
+ assertTrue(actualMessage instanceof Int64Value);
+ assertThat(((Int64Value) actualMessage).getValue(), is(1L));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new
BigInteger("1234"));
+ assertTrue(actualMessage instanceof BigIntegerValue);
+ assertThat(new BigInteger(((BigIntegerValue)
actualMessage).getValue().toByteArray()), is(new BigInteger("1234")));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1.0F);
+ assertTrue(actualMessage instanceof FloatValue);
+ assertThat(((FloatValue) actualMessage).getValue(), is(1.0F));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(1.23);
+ assertTrue(actualMessage instanceof DoubleValue);
+ assertThat(((DoubleValue) actualMessage).getValue(), is(1.23));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new
BigDecimal("100"));
+ assertTrue(actualMessage instanceof BigDecimalValue);
+ assertThat(((BigDecimalValue) actualMessage).getValue(), is("100"));
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage("abcd");
+ assertTrue(actualMessage instanceof StringValue);
+ assertThat(((StringValue) actualMessage).getValue(), is("abcd"));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(true);
+ assertTrue(actualMessage instanceof BoolValue);
+ assertTrue(((BoolValue) actualMessage).getValue());
+ Timestamp now = new Timestamp(System.currentTimeMillis());
+ long epochSecond = now.toInstant().getEpochSecond();
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage(now.toLocalDateTime());
+ assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+ assertThat(((com.google.protobuf.Timestamp)
actualMessage).getSeconds(), is(epochSecond));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(now);
+ assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+ assertThat(((com.google.protobuf.Timestamp)
actualMessage).getSeconds(), is(epochSecond));
+ actualMessage = ColumnValueConvertUtil.convertToProtobufMessage(new
Date());
+ assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+ assertThat(((com.google.protobuf.Timestamp)
actualMessage).getSeconds(), is(epochSecond));
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage(now.toInstant());
+ assertTrue(actualMessage instanceof com.google.protobuf.Timestamp);
+ assertThat(((com.google.protobuf.Timestamp) actualMessage).getNanos(),
is(now.toInstant().getNano()));
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage(now.toLocalDateTime().toLocalTime());
+ assertTrue(actualMessage instanceof LocalTimeValue);
+ assertThat(((LocalTimeValue) actualMessage).getValue(),
is(now.toLocalDateTime().toLocalTime().toString()));
+ actualMessage =
ColumnValueConvertUtil.convertToProtobufMessage("123456".getBytes());
+ assertTrue(actualMessage instanceof BytesValue);
+ assertThat(((BytesValue) actualMessage).getValue().toByteArray(),
is("123456".getBytes()));
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
index 3eac0066d39..00b3c76c452 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCRequestProtocol.proto
@@ -66,34 +66,32 @@ message CreateSubscriptionRequest {
string schema = 1;
string name = 2;
}
- repeated TableName tableNames = 2;
- string subscriptionName = 3;
- SubscriptionMode subscriptionMode = 4;
-
+ repeated TableName table_names = 2;
+ string subscription_name = 3;
enum SubscriptionMode {
UNKNOWN = 0;
INCREMENTAL = 1;
FULL = 2;
}
+ SubscriptionMode subscription_mode = 4;
+ bool incremental_global_orderly = 5;
}
message StartSubscriptionRequest {
string database = 1;
- string subscriptionName = 2;
+ string subscription_name = 2;
}
message StopSubscriptionRequest {
string database = 1;
- string subscriptionName = 2;
+ string subscription_name = 2;
}
message DropSubscriptionRequest {
string database = 1;
- string subscriptionName = 2;
+ string subscription_name = 2;
}
message AckRequest {
- string database = 1;
- string subscriptionName = 2;
string ack_id = 3;
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
index a583f3f664c..49c82f09433 100644
--- a/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
+++ b/kernel/data-pipeline/cdc/protocol/src/main/proto/CDCResponseProtocol.proto
@@ -47,7 +47,7 @@ message ServerGreetingResult {
}
message CreateSubscriptionResult {
- string subscriptionName = 1;
+ string subscription_name = 1;
bool existing = 2;
}
@@ -63,10 +63,18 @@ message BigDecimalValue {
string value = 1;
}
+message LocalTimeValue {
+ string value = 1;
+}
+
message ClobValue {
string value = 1;
}
+message BlobValue {
+ bytes value = 1;
+}
+
message DataRecordResult {
message Record {
map<string, google.protobuf.Any> before = 1;
@@ -74,9 +82,9 @@ message DataRecordResult {
message TableMetaData {
string database = 1;
optional string schema = 2;
- string tableName = 3;
+ string table_name = 3;
}
- TableMetaData tableMetaData = 3;
+ TableMetaData table_meta_data = 3;
int64 transaction_commit_millis = 4;
enum DataChangeType {
UNKNOWN = 0;
@@ -90,8 +98,9 @@ message DataRecordResult {
ALTER_INDEX = 8;
DROP_INDEX = 9;
}
- bool isDDL = 6;
- optional string ddlSQL = 7;
+ DataChangeType data_change_type = 5;
+ bool is_ddl = 6;
+ optional string ddl_SQL = 7;
}
string ack_id = 1;
repeated Record records = 2;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
index ff8ad06e468..ba0f86cc7d0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DataSourceImporterCreator.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
/**
@@ -32,7 +33,7 @@ public final class DataSourceImporterCreator implements
ImporterCreator {
@Override
public Importer createImporter(final ImporterConfiguration importerConfig,
final ImporterConnector importerConnector,
final PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
+ final PipelineJobProgressListener
jobProgressListener, final ImporterType importerType) {
return new DataSourceImporter(importerConfig, importerConnector,
channel, jobProgressListener);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
index 02d8146909a..4d6a2d26969 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/record/RecordUtil.java
@@ -19,8 +19,10 @@ package org.apache.shardingsphere.data.pipeline.core.record;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import java.util.ArrayList;
import java.util.List;
@@ -80,4 +82,21 @@ public final class RecordUtil {
}
return result;
}
+
+ /**
+ * Get last normal record.
+ *
+ * @param records records
+ * @return last normal record.
+ */
+ public static Record getLastNormalRecord(final List<Record> records) {
+ for (int index = records.size() - 1; index >= 0; index--) {
+ Record record = records.get(index);
+ if (record.getPosition() instanceof PlaceholderPosition) {
+ continue;
+ }
+ return record;
+ }
+ return null;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 5e08786080d..3d0e3df1518 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
@@ -96,7 +97,8 @@ public final class IncrementalTask implements PipelineTask,
AutoCloseable {
final
PipelineJobProgressListener jobProgressListener) {
Collection<Importer> result = new LinkedList<>();
for (int i = 0; i < concurrency; i++) {
-
result.add(TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector,
channel, jobProgressListener));
+
result.add(TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector,
channel, jobProgressListener,
+ ImporterType.INCREMENTAL));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 6ee2ead917e..01fc4c91e04 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
@@ -34,7 +33,9 @@ import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskPr
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
@@ -42,7 +43,6 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
import javax.sql.DataSource;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -77,7 +77,8 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
this.inventoryImporterExecuteEngine = inventoryImporterExecuteEngine;
channel = createChannel(pipelineChannelCreator);
dumper = new InventoryDumper(inventoryDumperConfig, channel,
sourceDataSource, sourceMetaDataLoader);
- importer =
TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector,
channel, jobProgressListener);
+ importer =
TypedSPIRegistry.getRegisteredService(ImporterCreator.class,
importerConnector.getType()).createImporter(importerConfig, importerConnector,
channel, jobProgressListener,
+ ImporterType.INVENTORY);
position = inventoryDumperConfig.getPosition();
}
@@ -120,24 +121,13 @@ public final class InventoryTask implements PipelineTask,
AutoCloseable {
private PipelineChannel createChannel(final PipelineChannelCreator
pipelineChannelCreator) {
return pipelineChannelCreator.createPipelineChannel(1, records -> {
- Record lastNormalRecord = getLastNormalRecord(records);
+ Record lastNormalRecord = RecordUtil.getLastNormalRecord(records);
if (null != lastNormalRecord) {
position = lastNormalRecord.getPosition();
}
});
}
- private Record getLastNormalRecord(final List<Record> records) {
- for (int index = records.size() - 1; index >= 0; index--) {
- Record record = records.get(index);
- if (record.getPosition() instanceof PlaceholderPosition) {
- continue;
- }
- return record;
- }
- return null;
- }
-
@Override
public void stop() {
dumper.stop();
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index dec16de00cc..0381f69b1a3 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -34,7 +34,7 @@ public final class OpenGaussIncrementalDumperCreator
implements IncrementalDumpe
@Override
public IncrementalDumper createIncrementalDumper(final DumperConfiguration
dumperConfig, final IngestPosition<WALPosition> position,
final PipelineChannel
channel, final PipelineTableMetaDataLoader metaDataLoader) {
- return new OpenGaussWALDumper(dumperConfig, position, channel,
metaDataLoader, false);
+ return new OpenGaussWALDumper(dumperConfig, position, channel,
metaDataLoader);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index 4830429699d..cd144dec30d 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -67,7 +67,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
private final List<AbstractRowEvent> rowEvents = new LinkedList<>();
public OpenGaussWALDumper(final DumperConfiguration dumperConfig, final
IngestPosition<WALPosition> position,
- final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader, final boolean decodeWithTX) {
+ final PipelineChannel channel, final
PipelineTableMetaDataLoader metaDataLoader) {
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(dumperConfig.getDataSourceConfig().getClass()),
() -> new
UnsupportedSQLOperationException("PostgreSQLWALDumper only support
PipelineDataSourceConfiguration"));
this.dumperConfig = dumperConfig;
@@ -75,7 +75,7 @@ public final class OpenGaussWALDumper extends
AbstractLifecycleExecutor implemen
this.channel = channel;
walEventConverter = new WALEventConverter(dumperConfig,
metaDataLoader);
logicalReplication = new OpenGaussLogicalReplication();
- this.decodeWithTX = decodeWithTX;
+ this.decodeWithTX = dumperConfig.isDecodeWithTX();
}
@Override
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index 88f17388750..44b16467d64 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -20,15 +20,20 @@ package org.apache.shardingsphere.proxy.backend.handler.cdc;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.CreateSubscriptionJobParameter;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckHolder;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.connector.CDCImporterConnector;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import
org.apache.shardingsphere.data.pipeline.cdc.generator.DataRecordComparatorGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
@@ -45,6 +50,8 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.TableRule;
+import java.sql.SQLException;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -66,13 +73,13 @@ public final class CDCBackendHandler {
* @return CDC response
*/
public CDCResponse createSubscription(final CDCRequest request) {
- CreateSubscriptionRequest subscriptionRequest =
request.getCreateSubscription();
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(subscriptionRequest.getDatabase());
+ CreateSubscriptionRequest createSubscription =
request.getCreateSubscription();
+ ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(createSubscription.getDatabase());
if (null == database) {
- return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists",
subscriptionRequest.getDatabase()));
+ return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists",
createSubscription.getDatabase()));
}
List<String> tableNames = new LinkedList<>();
- for (TableName each : subscriptionRequest.getTableNamesList()) {
+ for (TableName each : createSubscription.getTableNamesList()) {
tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ?
each.getName() : String.join(".", each.getSchema(), each.getName()));
}
Optional<ShardingRule> rule =
database.getRuleMetaData().getRules().stream().filter(each -> each instanceof
ShardingRule).map(each -> (ShardingRule) each).findFirst();
@@ -83,14 +90,14 @@ public final class CDCBackendHandler {
for (String each : tableNames) {
actualDataNodesMap.put(each, getActualDataNodes(rule.get(), each));
}
- CreateSubscriptionJobParameter parameter = new
CreateSubscriptionJobParameter(subscriptionRequest.getDatabase(), tableNames,
subscriptionRequest.getSubscriptionName(),
- subscriptionRequest.getSubscriptionMode().name(),
actualDataNodesMap);
+ CreateSubscriptionJobParameter parameter = new
CreateSubscriptionJobParameter(createSubscription.getDatabase(), tableNames,
createSubscription.getSubscriptionName(),
+ createSubscription.getSubscriptionMode().name(),
actualDataNodesMap, createSubscription.getIncrementalGlobalOrderly());
if (jobAPI.createJob(parameter)) {
return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(false).build()).build();
+
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(false).build()).build();
} else {
return
CDCResponseGenerator.succeedBuilder(request.getRequestId()).setCreateSubscriptionResult(CreateSubscriptionResult.newBuilder()
-
.setSubscriptionName(subscriptionRequest.getSubscriptionName()).setExisting(true).build()).build();
+
.setSubscriptionName(createSubscription.getSubscriptionName()).setExisting(true).build()).build();
}
}
@@ -111,17 +118,20 @@ public final class CDCBackendHandler {
public CDCResponse startSubscription(final CDCRequest request, final
Channel channel, final CDCConnectionContext connectionContext) {
StartSubscriptionRequest startSubscriptionRequest =
request.getStartSubscription();
String jobId = jobAPI.marshalJobId(new
CDCJobId(startSubscriptionRequest.getDatabase(),
startSubscriptionRequest.getSubscriptionName()));
- JobConfigurationPOJO jobConfigPOJO =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
- if (null == jobConfigPOJO) {
+ CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration)
jobAPI.getJobConfiguration(jobId);
+ if (null == cdcJobConfig) {
return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, String.format("the %s job config
doesn't exist",
startSubscriptionRequest.getSubscriptionName()));
}
- if (!jobConfigPOJO.isDisabled()) {
- return CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, String.format("the %s is being used",
startSubscriptionRequest.getSubscriptionName()));
- }
+ JobConfigurationPOJO jobConfigPOJO =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
+ // TODO, ensure that there is only one consumer at a time, job config
disable may not be updated when the program is forced to close
jobConfigPOJO.setDisabled(false);
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
- CDCJob job = new CDCJob(new CDCImporterConnector(channel));
+ ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabase());
+ Comparator<DataRecord> dataRecordComparator =
cdcJobConfig.isDecodeWithTX()
+ ?
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
+ : null;
+ CDCJob job = new CDCJob(new CDCImporterConnector(channel,
cdcJobConfig.getDatabase(), cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
@@ -137,9 +147,32 @@ public final class CDCBackendHandler {
* @param jobId job id
*/
public void stopSubscription(final String jobId) {
+ if (Strings.isNullOrEmpty(jobId)) {
+ log.warn("job id is null or empty, ignored");
+ return;
+ }
PipelineJobCenter.stop(jobId);
JobConfigurationPOJO jobConfig =
PipelineAPIFactory.getJobConfigurationAPI().getJobConfiguration(jobId);
jobConfig.setDisabled(true);
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfig);
}
+
+ /**
+ * Drop subscription.
+ *
+ * @param jobId job id.
+ * @throws SQLException sql exception
+ */
+ public void dropSubscription(final String jobId) throws SQLException {
+ jobAPI.rollback(jobId);
+ }
+
+ /**
+ * Process ack.
+ *
+ * @param ackRequest ack request
+ */
+ public void processAck(final AckRequest ackRequest) {
+ CDCAckHolder.getInstance().ack(ackRequest.getAckId());
+ }
}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 451e6ac665e..965e0ea75cf 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
@@ -44,6 +45,7 @@ import
org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.sql.SQLException;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
@@ -97,9 +99,10 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
stopStartSubscription(ctx, request, connectionContext);
break;
case DROP_SUBSCRIPTION:
- dropStartSubscription(ctx, request);
+ dropStartSubscription(ctx, request, connectionContext);
break;
case ACK_REQUEST:
+ processAckRequest(ctx, request);
break;
default:
log.warn("Cannot handle this type of request {}", request);
@@ -159,19 +162,38 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
if (startSubscriptionRequest.getDatabase().isEmpty() ||
startSubscriptionRequest.getSubscriptionName().isEmpty()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal start subscription request
parameter"))
.addListener(ChannelFutureListener.CLOSE);
+ return;
}
CDCResponse response = backendHandler.startSubscription(request,
ctx.channel(), connectionContext);
ctx.writeAndFlush(response);
}
private void stopStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
- // TODO waiting for pipeline refactoring finished
+ backendHandler.stopSubscription(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
- private void dropStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request) {
- // TODO waiting for pipeline refactoring finished
-
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ private void dropStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
+ try {
+ backendHandler.dropSubscription(connectionContext.getJobId());
+
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
+ } catch (final SQLException ex) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, ex.getMessage()));
+ }
+ }
+
+ private void processAckRequest(final ChannelHandlerContext ctx, final
CDCRequest request) {
+ if (!request.hasAckRequest()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss ack request body"))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ }
+ AckRequest ackRequest = request.getAckRequest();
+ if (ackRequest.getAckId().isEmpty()) {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal ack request parameter"));
+ return;
+ }
+ backendHandler.processAck(ackRequest);
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
index a227fc74322..c672c8497fd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
/**
@@ -32,7 +33,7 @@ public final class FixtureImporterCreator implements
ImporterCreator {
@Override
public Importer createImporter(final ImporterConfiguration importerConfig,
final ImporterConnector importerConnector,
final PipelineChannel channel,
- final PipelineJobProgressListener
jobProgressListener) {
+ final PipelineJobProgressListener
jobProgressListener, final ImporterType importerType) {
return new FixtureImporter(importerConfig, importerConnector, channel,
jobProgressListener);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
index 2a83bddaaea..dac54263d90 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/record/RecordUtilTest.java
@@ -17,18 +17,25 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.record;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.junit.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public final class RecordUtilTest {
@@ -67,4 +74,14 @@ public final class RecordUtilTest {
result.addColumn(new Column("c3", "", true, false));
return result;
}
+
+ @Test
+ public void assertGetLastNormalRecord() {
+ List<Record> actual = Arrays.asList(new DataRecord(new
IntegerPrimaryKeyPosition(0, 1), 0), new PlaceholderRecord(new
PlaceholderPosition()));
+ Record expected = RecordUtil.getLastNormalRecord(actual);
+ assertThat(expected, instanceOf(DataRecord.class));
+ actual = Arrays.asList(new DataRecord(new IntegerPrimaryKeyPosition(0,
1), 0), new PlaceholderRecord(new PlaceholderPosition()), new
FinishedRecord(new FinishedPosition()));
+ expected = RecordUtil.getLastNormalRecord(actual);
+ assertThat(expected, instanceOf(FinishedRecord.class));
+ }
}