sandynz commented on code in PR #22739:
URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1042953244


##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/subscribe/CDCSubscriptionSubscribe.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.subscribe;
+
+import com.google.common.eventbus.Subscribe;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+/**
+ * CDC subscription subscribe.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class CDCSubscriptionSubscribe {
+    
+    private static final CDCSubscriptionSubscribe INSTANCE = new 
CDCSubscriptionSubscribe();
+    
+    private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance();
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static CDCSubscriptionSubscribe getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Start resharding job.
+     *
+     * @param event start resharding event.

Review Comment:
   Javadoc does not match method.



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java:
##########
@@ -47,8 +47,8 @@ public void userEventTriggered(final ChannelHandlerContext 
ctx, final Object evt
     
     private CreateSubscriptionRequest buildCreateSubscriptionRequest() {
         // TODO the parameter shouldn't hard code, will be fixed when completed
-        TableName tableName = TableName.newBuilder().build();
-        return 
CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db")
+        TableName tableName = 
TableName.newBuilder().setName("t_order").build();
+        return 
CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("subscribe_sharding_db").setDatabase("sharding_db")
                 .addTableNames(tableName).build();

Review Comment:
   These test code could be in local test class and not committed (it could be 
put to E2E test later), `cdc-client` could be used as library



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.commons.codec.digest.DigestUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * CDC job API impl.
+ */
+public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements 
CDCJobAPI {
+    
+    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
+    
+    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = 
new YamlRuleConfigurationSwapperEngine();
+    
+    private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public String createJobAndStart(final CreateSubscriptionJobEvent event) {
+        YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
+        Map<String, String> tablesFirstDataNodesMap = new 
HashMap<>(event.getDataNodesMap().size(), 1);
+        for (Entry<String, List<DataNode>> entry : 
event.getDataNodesMap().entrySet()) {
+            tablesFirstDataNodesMap.put(entry.getKey(), new 
JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1)))).marshal());
+        }
+        yamlJobConfig.setDatabase(event.getDatabase());
+        yamlJobConfig.setTableNames(event.getSubscribeTableNames());
+        yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
+        yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+        
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+        yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap);
+        extendYamlJobConfiguration(yamlJobConfig);
+        CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
+        start(jobConfig);
+        return jobConfig.getJobId();
+    }
+    
+    private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
+        Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
+        for (Entry<String, DataSource> entry : 
database.getResourceMetaData().getDataSources().entrySet()) {
+            dataSourceProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue())));
+        }
+        YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
+        targetRootConfig.setDatabaseName(database.getName());
+        targetRootConfig.setDataSources(dataSourceProps);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = 
ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations());
+        targetRootConfig.setRules(yamlRuleConfigurations);
+        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+    }
+    
+    @Override
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+                    config.getDataSourceConfiguration().getParameter());
+            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final YamlCDCJobConfiguration config) {
+        CDCJobId jobId = new CDCJobId(config.getDatabase(), 
config.getSubscriptionName());
+        return marshalJobId(jobId);
+    }
+    
+    @Override
+    public PipelineTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    @Override
+    protected PipelineJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+        return new 
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        // TODO to be implement
+    }
+    
+    @Override
+    public PipelineJobItemProgress getJobItemProgress(final String jobId, 
final int shardingItem) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        // TODO to be implement
+    }
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {

Review Comment:
   It's better to put it together with jobId related methods, e.g. generateJobId



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.config.job;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+
+import java.util.List;
+
+/**
+ * CDC job configuration.
+ */
+@Getter
+@RequiredArgsConstructor
+public final class CDCJobConfiguration implements PipelineJobConfiguration {
+    
+    private final String jobId;
+    
+    private final String database;
+    
+    private final List<String> tableNames;
+    
+    private final String subscriptionName;
+    
+    private final String subscriptionMode;
+    
+    private final String sourceDatabaseType;
+    
+    private final PipelineDataSourceConfiguration dataSourceConfiguration;
+    
+    @Override
+    public int getJobShardingCount() {
+        return 1;

Review Comment:
   Seems job sharding count won't be always `1`, does it need TODO?



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.api.impl;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.commons.codec.digest.DigestUtils;
+import 
org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * CDC job API impl.
+ */
+public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements 
CDCJobAPI {
+    
+    private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = 
new YamlDataSourceConfigurationSwapper();
+    
+    private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = 
new YamlRuleConfigurationSwapperEngine();
+    
+    private final YamlPipelineDataSourceConfigurationSwapper 
pipelineDataSourceConfigSwapper = new 
YamlPipelineDataSourceConfigurationSwapper();
+    
+    @Override
+    public String createJobAndStart(final CreateSubscriptionJobEvent event) {
+        YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration();
+        Map<String, String> tablesFirstDataNodesMap = new 
HashMap<>(event.getDataNodesMap().size(), 1);
+        for (Entry<String, List<DataNode>> entry : 
event.getDataNodesMap().entrySet()) {
+            tablesFirstDataNodesMap.put(entry.getKey(), new 
JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), 
entry.getValue().subList(0, 1)))).marshal());
+        }
+        yamlJobConfig.setDatabase(event.getDatabase());
+        yamlJobConfig.setTableNames(event.getSubscribeTableNames());
+        yamlJobConfig.setSubscriptionName(event.getSubscriptionName());
+        yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode());
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase());
+        
yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database)));
+        yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap);
+        extendYamlJobConfiguration(yamlJobConfig);
+        CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
+        start(jobConfig);
+        return jobConfig.getJobId();
+    }
+    
+    private ShardingSpherePipelineDataSourceConfiguration 
getDataSourceConfiguration(final ShardingSphereDatabase database) {
+        Map<String, Map<String, Object>> dataSourceProps = new HashMap<>();
+        for (Entry<String, DataSource> entry : 
database.getResourceMetaData().getDataSources().entrySet()) {
+            dataSourceProps.put(entry.getKey(), 
dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue())));
+        }
+        YamlRootConfiguration targetRootConfig = new YamlRootConfiguration();
+        targetRootConfig.setDatabaseName(database.getName());
+        targetRootConfig.setDataSources(dataSourceProps);
+        Collection<YamlRuleConfiguration> yamlRuleConfigurations = 
ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations());
+        targetRootConfig.setRules(yamlRuleConfigurations);
+        return new 
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
+    }
+    
+    @Override
+    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
+        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
+        if (null == yamlJobConfig.getJobId()) {
+            config.setJobId(generateJobId(config));
+        }
+        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
+                    config.getDataSourceConfiguration().getParameter());
+            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
+        }
+    }
+    
+    private String generateJobId(final YamlCDCJobConfiguration config) {
+        CDCJobId jobId = new CDCJobId(config.getDatabase(), 
config.getSubscriptionName());
+        return marshalJobId(jobId);
+    }
+    
+    @Override
+    public PipelineTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public PipelineJobConfiguration getJobConfiguration(final String jobId) {
+        return getJobConfiguration(getElasticJobConfigPOJO(jobId));
+    }
+    
+    @Override
+    protected PipelineJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+        return new 
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    }
+    
+    @Override
+    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
+        // TODO to be implement
+    }
+    
+    @Override
+    public PipelineJobItemProgress getJobItemProgress(final String jobId, 
final int shardingItem) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
+        // TODO to be implement
+    }
+    
+    @Override
+    protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
+        CDCJobId jobId = (CDCJobId) pipelineJobId;
+        String text = Joiner.on('|').join(jobId.getDatabaseName(), 
jobId.getSubscriptionName());
+        return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8));
+    }
+    
+    @Override
+    protected PipelineJobInfo getJobInfo(final String jobId) {
+        // TODO to be implement
+        return null;
+    }
+    
+    @Override
+    protected String getJobClassName() {
+        return CDCJob.class.getName();
+    }
+    
+    @Override
+    protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final 
PipelineJobConfiguration jobConfig) {

Review Comment:
   It's better to put it together with jobConfiguration methods, e.g. 
getJobConfiguration



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/listener/CDCContextManagerLifecycleListener.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.core.subscribe.CDCSubscriptionSubscribe;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import 
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+
+/**
+ * CDC context manager lifecycle listener.
+ */
+@Slf4j
+public final class CDCContextManagerLifecycleListener implements 
ContextManagerLifecycleListener {

Review Comment:
   We could remove it and related configuration if event bus event is not 
required



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.handler.cdc;
+
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
+import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * CDC backend handler.
+ */
+@Slf4j
+public final class CDCBackendHandler {
+    
+    /**
+     * Create subscription.
+     *
+     * @param request CDC request
+     * @return CDC response
+     */
+    public CDCResponse createSubscription(final CDCRequest request) {
+        CreateSubscriptionRequest subscriptionRequest = 
request.getCreateSubscription();
+        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(subscriptionRequest.getDatabase());
+        if (null == database) {
+            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
subscriptionRequest.getDatabase()));
+        }
+        List<String> tableNames = new LinkedList<>();
+        for (TableName each : subscriptionRequest.getTableNamesList()) {
+            tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? 
each.getName() : String.join(".", each.getSchema(), each.getName()));
+        }
+        Optional<ShardingRule> rule = 
database.getRuleMetaData().getRules().stream().filter(each -> each instanceof 
ShardingRule).map(each -> (ShardingRule) each).findFirst();
+        if (!rule.isPresent()) {
+            return CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule");
+        }
+        Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
+        for (String each : tableNames) {
+            actualDataNodesMap.put(each, getActualDataNodes(rule.get(), each));
+        }
+        CreateSubscriptionJobEvent event = new 
CreateSubscriptionJobEvent(subscriptionRequest.getDatabase(), tableNames, 
subscriptionRequest.getSubscriptionName(),
+                subscriptionRequest.getSubscriptionMode().name(), 
actualDataNodesMap);
+        
ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(event);

Review Comment:
   Could we remove event bus event, just invoke underlying class? proxy-backend 
module already depends on cdc-core module.



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.handler.cdc;
+
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
+import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+public final class CDCBackendHandlerTest {
+    
+    private static MockedStatic<PipelineContext> pipelineContextMocked;
+    
+    private final CDCBackendHandler handler = new CDCBackendHandler();
+    
+    @BeforeClass
+    public static void beforeClass() {
+        MetaDataContexts metaDataContexts = new 
MetaDataContexts(mock(MetaDataPersistService.class),
+                new ShardingSphereMetaData(getDatabases(), 
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new 
Properties())));
+        ContextManager contextManager = new ContextManager(metaDataContexts, 
mock(InstanceContext.class));
+        pipelineContextMocked = mockStatic(PipelineContext.class);
+        
pipelineContextMocked.when(PipelineContext::getContextManager).thenReturn(contextManager);
+    }
+    
+    private static Map<String, ShardingSphereDatabase> getDatabases() {
+        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
+        when(database.getName()).thenReturn("sharding_db");
+        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
+        Set<ShardingSphereRule> shardingRule = 
Collections.singleton(mock(ShardingSphereRule.class));
+        when(database.getRuleMetaData().getRules()).thenReturn(shardingRule);
+        Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1);
+        result.put("sharding_db", database);
+        return result;
+    }
+    
+    @AfterClass
+    public static void afterClass() {
+        pipelineContextMocked.close();
+    }
+    
+    @Test
+    public void assertCreateSubscriptionFailed() {
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("none")).build();
+        CDCResponse actualResponse = handler.createSubscription(request);
+        assertThat(actualResponse.getStatus(), is(Status.FAILED));
+    }
+    
+    @Test
+    public void assertCreateSubscriptionSucceed() {
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("sharding_db")).build();
+        CDCResponse actualResponse = handler.createSubscription(request);
+        assertThat(actualResponse.getStatus(), is(Status.FAILED));
+    }

Review Comment:
   `is(Status.FAILED)` doesn't match method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to