sandynz commented on code in PR #22739: URL: https://github.com/apache/shardingsphere/pull/22739#discussion_r1042953244
########## kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/subscribe/CDCSubscriptionSubscribe.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.core.subscribe; + +import com.google.common.eventbus.Subscribe; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPIFactory; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent; + +/** + * CDC subscription subscribe. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class CDCSubscriptionSubscribe { + + private static final CDCSubscriptionSubscribe INSTANCE = new CDCSubscriptionSubscribe(); + + private final CDCJobAPI jobAPI = CDCJobAPIFactory.getInstance(); + + /** + * Get instance. + * + * @return instance + */ + public static CDCSubscriptionSubscribe getInstance() { + return INSTANCE; + } + + /** + * Start resharding job. + * + * @param event start resharding event. Review Comment: Javadoc does not match method. ########## kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/SubscriptionRequestHandler.java: ########## @@ -47,8 +47,8 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt private CreateSubscriptionRequest buildCreateSubscriptionRequest() { // TODO the parameter shouldn't hard code, will be fixed when completed - TableName tableName = TableName.newBuilder().build(); - return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("sharding_db").setDatabase("sharding_db") + TableName tableName = TableName.newBuilder().setName("t_order").build(); + return CreateSubscriptionRequest.newBuilder().setSubscriptionMode(SubscriptionMode.INCREMENTAL).setSubscriptionName("subscribe_sharding_db").setDatabase("sharding_db") .addTableNames(tableName).build(); Review Comment: These test code could be in local test class and not committed (it could be put to E2E test later), `cdc-client` could be used as library ########## kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.api.impl; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext; +import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry; +import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId; +import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; +import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob; +import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl; +import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; +import org.apache.shardingsphere.data.pipeline.spi.job.JobType; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; +import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration; +import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; +import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent; + +import javax.sql.DataSource; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * CDC job API impl. + */ +public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements CDCJobAPI { + + private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + + private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); + + private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); + + @Override + public String createJobAndStart(final CreateSubscriptionJobEvent event) { + YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration(); + Map<String, String> tablesFirstDataNodesMap = new HashMap<>(event.getDataNodesMap().size(), 1); + for (Entry<String, List<DataNode>> entry : event.getDataNodesMap().entrySet()) { + tablesFirstDataNodesMap.put(entry.getKey(), new JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)))).marshal()); + } + yamlJobConfig.setDatabase(event.getDatabase()); + yamlJobConfig.setTableNames(event.getSubscribeTableNames()); + yamlJobConfig.setSubscriptionName(event.getSubscriptionName()); + yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode()); + ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase()); + yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database))); + yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap); + extendYamlJobConfiguration(yamlJobConfig); + CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig); + start(jobConfig); + return jobConfig.getJobId(); + } + + private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) { + Map<String, Map<String, Object>> dataSourceProps = new HashMap<>(); + for (Entry<String, DataSource> entry : database.getResourceMetaData().getDataSources().entrySet()) { + dataSourceProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()))); + } + YamlRootConfiguration targetRootConfig = new YamlRootConfiguration(); + targetRootConfig.setDatabaseName(database.getName()); + targetRootConfig.setDataSources(dataSourceProps); + Collection<YamlRuleConfiguration> yamlRuleConfigurations = ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()); + targetRootConfig.setRules(yamlRuleConfigurations); + return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig); + } + + @Override + public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) { + YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig; + if (null == yamlJobConfig.getJobId()) { + config.setJobId(generateJobId(config)); + } + if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) { + PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(), + config.getDataSourceConfiguration().getParameter()); + config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType()); + } + } + + private String generateJobId(final YamlCDCJobConfiguration config) { + CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSubscriptionName()); + return marshalJobId(jobId); + } + + @Override + public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { + // TODO to be implement + return null; + } + + @Override + public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { + // TODO to be implement + return null; + } + + @Override + public PipelineJobConfiguration getJobConfiguration(final String jobId) { + return getJobConfiguration(getElasticJobConfigPOJO(jobId)); + } + + @Override + protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { + return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + } + + @Override + public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) { + // TODO to be implement + } + + @Override + public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) { + // TODO to be implement + return null; + } + + @Override + public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) { + // TODO to be implement + } + + @Override + protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { Review Comment: It's better to put it together with jobId related methods, e.g. generateJobId ########## kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.config.job; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; + +import java.util.List; + +/** + * CDC job configuration. + */ +@Getter +@RequiredArgsConstructor +public final class CDCJobConfiguration implements PipelineJobConfiguration { + + private final String jobId; + + private final String database; + + private final List<String> tableNames; + + private final String subscriptionName; + + private final String subscriptionMode; + + private final String sourceDatabaseType; + + private final PipelineDataSourceConfiguration dataSourceConfiguration; + + @Override + public int getJobShardingCount() { + return 1; Review Comment: Seems job sharding count won't be always `1`, does it need TODO? ########## kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPIImpl.java: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.api.impl; + +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.shardingsphere.data.pipeline.api.config.PipelineTaskConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration; +import org.apache.shardingsphere.data.pipeline.api.config.process.PipelineProcessConfiguration; +import org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext; +import org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext; +import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry; +import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration; +import org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; +import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId; +import org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress; +import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo; +import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI; +import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType; +import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJob; +import org.apache.shardingsphere.data.pipeline.cdc.core.job.CDCJobId; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfiguration; +import org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurationSwapper; +import org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl; +import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; +import org.apache.shardingsphere.data.pipeline.spi.job.JobType; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration; +import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration; +import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper; +import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent; + +import javax.sql.DataSource; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * CDC job API impl. + */ +public final class CDCJobAPIImpl extends AbstractPipelineJobAPIImpl implements CDCJobAPI { + + private final YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new YamlDataSourceConfigurationSwapper(); + + private final YamlRuleConfigurationSwapperEngine ruleConfigSwapperEngine = new YamlRuleConfigurationSwapperEngine(); + + private final YamlPipelineDataSourceConfigurationSwapper pipelineDataSourceConfigSwapper = new YamlPipelineDataSourceConfigurationSwapper(); + + @Override + public String createJobAndStart(final CreateSubscriptionJobEvent event) { + YamlCDCJobConfiguration yamlJobConfig = new YamlCDCJobConfiguration(); + Map<String, String> tablesFirstDataNodesMap = new HashMap<>(event.getDataNodesMap().size(), 1); + for (Entry<String, List<DataNode>> entry : event.getDataNodesMap().entrySet()) { + tablesFirstDataNodesMap.put(entry.getKey(), new JobDataNodeLine(Collections.singleton(new JobDataNodeEntry(entry.getKey(), entry.getValue().subList(0, 1)))).marshal()); + } + yamlJobConfig.setDatabase(event.getDatabase()); + yamlJobConfig.setTableNames(event.getSubscribeTableNames()); + yamlJobConfig.setSubscriptionName(event.getSubscriptionName()); + yamlJobConfig.setSubscriptionMode(event.getSubscriptionMode()); + ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(event.getDatabase()); + yamlJobConfig.setDataSourceConfiguration(pipelineDataSourceConfigSwapper.swapToYamlConfiguration(getDataSourceConfiguration(database))); + yamlJobConfig.setTablesFirstDataNodesMap(tablesFirstDataNodesMap); + extendYamlJobConfiguration(yamlJobConfig); + CDCJobConfiguration jobConfig = new YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig); + start(jobConfig); + return jobConfig.getJobId(); + } + + private ShardingSpherePipelineDataSourceConfiguration getDataSourceConfiguration(final ShardingSphereDatabase database) { + Map<String, Map<String, Object>> dataSourceProps = new HashMap<>(); + for (Entry<String, DataSource> entry : database.getResourceMetaData().getDataSources().entrySet()) { + dataSourceProps.put(entry.getKey(), dataSourceConfigSwapper.swapToMap(DataSourcePropertiesCreator.create(entry.getValue()))); + } + YamlRootConfiguration targetRootConfig = new YamlRootConfiguration(); + targetRootConfig.setDatabaseName(database.getName()); + targetRootConfig.setDataSources(dataSourceProps); + Collection<YamlRuleConfiguration> yamlRuleConfigurations = ruleConfigSwapperEngine.swapToYamlRuleConfigurations(database.getRuleMetaData().getConfigurations()); + targetRootConfig.setRules(yamlRuleConfigurations); + return new ShardingSpherePipelineDataSourceConfiguration(targetRootConfig); + } + + @Override + public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration yamlJobConfig) { + YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) yamlJobConfig; + if (null == yamlJobConfig.getJobId()) { + config.setJobId(generateJobId(config)); + } + if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) { + PipelineDataSourceConfiguration sourceDataSourceConfig = PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(), + config.getDataSourceConfiguration().getParameter()); + config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType()); + } + } + + private String generateJobId(final YamlCDCJobConfiguration config) { + CDCJobId jobId = new CDCJobId(config.getDatabase(), config.getSubscriptionName()); + return marshalJobId(jobId); + } + + @Override + public PipelineTaskConfiguration buildTaskConfiguration(final PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final PipelineProcessConfiguration pipelineProcessConfig) { + // TODO to be implement + return null; + } + + @Override + public PipelineProcessContext buildPipelineProcessContext(final PipelineJobConfiguration pipelineJobConfig) { + // TODO to be implement + return null; + } + + @Override + public PipelineJobConfiguration getJobConfiguration(final String jobId) { + return getJobConfiguration(getElasticJobConfigPOJO(jobId)); + } + + @Override + protected PipelineJobConfiguration getJobConfiguration(final JobConfigurationPOJO jobConfigPOJO) { + return new YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter()); + } + + @Override + public void persistJobItemProgress(final PipelineJobItemContext jobItemContext) { + // TODO to be implement + } + + @Override + public PipelineJobItemProgress getJobItemProgress(final String jobId, final int shardingItem) { + // TODO to be implement + return null; + } + + @Override + public void updateJobItemStatus(final String jobId, final int shardingItem, final JobStatus status) { + // TODO to be implement + } + + @Override + protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) { + CDCJobId jobId = (CDCJobId) pipelineJobId; + String text = Joiner.on('|').join(jobId.getDatabaseName(), jobId.getSubscriptionName()); + return DigestUtils.md5Hex(text.getBytes(StandardCharsets.UTF_8)); + } + + @Override + protected PipelineJobInfo getJobInfo(final String jobId) { + // TODO to be implement + return null; + } + + @Override + protected String getJobClassName() { + return CDCJob.class.getName(); + } + + @Override + protected YamlPipelineJobConfiguration swapToYamlJobConfiguration(final PipelineJobConfiguration jobConfig) { Review Comment: It's better to put it together with jobConfiguration methods, e.g. getJobConfiguration ########## kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/listener/CDCContextManagerLifecycleListener.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.cdc.core.listener; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.core.subscribe.CDCSubscriptionSubscribe; +import org.apache.shardingsphere.infra.config.mode.ModeConfiguration; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener; + +/** + * CDC context manager lifecycle listener. + */ +@Slf4j +public final class CDCContextManagerLifecycleListener implements ContextManagerLifecycleListener { Review Comment: We could remove it and related configuration if event bus event is not required ########## proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.backend.handler.cdc; + +import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode; +import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseGenerator; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; +import org.apache.shardingsphere.infra.datanode.DataNode; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.cdc.CreateSubscriptionJobEvent; +import org.apache.shardingsphere.proxy.backend.context.ProxyContext; +import org.apache.shardingsphere.sharding.rule.ShardingRule; +import org.apache.shardingsphere.sharding.rule.TableRule; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * CDC backend handler. + */ +@Slf4j +public final class CDCBackendHandler { + + /** + * Create subscription. + * + * @param request CDC request + * @return CDC response + */ + public CDCResponse createSubscription(final CDCRequest request) { + CreateSubscriptionRequest subscriptionRequest = request.getCreateSubscription(); + ShardingSphereDatabase database = PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(subscriptionRequest.getDatabase()); + if (null == database) { + return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", subscriptionRequest.getDatabase())); + } + List<String> tableNames = new LinkedList<>(); + for (TableName each : subscriptionRequest.getTableNamesList()) { + tableNames.add(Strings.isNullOrEmpty(each.getSchema()) ? each.getName() : String.join(".", each.getSchema(), each.getName())); + } + Optional<ShardingRule> rule = database.getRuleMetaData().getRules().stream().filter(each -> each instanceof ShardingRule).map(each -> (ShardingRule) each).findFirst(); + if (!rule.isPresent()) { + return CDCResponseGenerator.failed(request.getRequestId(), CDCResponseErrorCode.SERVER_ERROR, "Not find sharding rule"); + } + Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>(); + for (String each : tableNames) { + actualDataNodesMap.put(each, getActualDataNodes(rule.get(), each)); + } + CreateSubscriptionJobEvent event = new CreateSubscriptionJobEvent(subscriptionRequest.getDatabase(), tableNames, subscriptionRequest.getSubscriptionName(), + subscriptionRequest.getSubscriptionMode().name(), actualDataNodesMap); + ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(event); Review Comment: Could we remove event bus event, just invoke underlying class? proxy-backend module already depends on cdc-core module. ########## proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.backend.handler.cdc; + +import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse; +import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status; +import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext; +import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; +import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType; +import org.apache.shardingsphere.infra.instance.InstanceContext; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; +import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; +import org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData; +import org.apache.shardingsphere.infra.rule.ShardingSphereRule; +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.metadata.MetaDataContexts; +import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.MockedStatic; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +public final class CDCBackendHandlerTest { + + private static MockedStatic<PipelineContext> pipelineContextMocked; + + private final CDCBackendHandler handler = new CDCBackendHandler(); + + @BeforeClass + public static void beforeClass() { + MetaDataContexts metaDataContexts = new MetaDataContexts(mock(MetaDataPersistService.class), + new ShardingSphereMetaData(getDatabases(), mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new Properties()))); + ContextManager contextManager = new ContextManager(metaDataContexts, mock(InstanceContext.class)); + pipelineContextMocked = mockStatic(PipelineContext.class); + pipelineContextMocked.when(PipelineContext::getContextManager).thenReturn(contextManager); + } + + private static Map<String, ShardingSphereDatabase> getDatabases() { + ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, RETURNS_DEEP_STUBS); + when(database.getName()).thenReturn("sharding_db"); + when(database.getProtocolType()).thenReturn(new MySQLDatabaseType()); + Set<ShardingSphereRule> shardingRule = Collections.singleton(mock(ShardingSphereRule.class)); + when(database.getRuleMetaData().getRules()).thenReturn(shardingRule); + Map<String, ShardingSphereDatabase> result = new LinkedHashMap<>(1, 1); + result.put("sharding_db", database); + return result; + } + + @AfterClass + public static void afterClass() { + pipelineContextMocked.close(); + } + + @Test + public void assertCreateSubscriptionFailed() { + CDCRequest request = CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("none")).build(); + CDCResponse actualResponse = handler.createSubscription(request); + assertThat(actualResponse.getStatus(), is(Status.FAILED)); + } + + @Test + public void assertCreateSubscriptionSucceed() { + CDCRequest request = CDCRequest.newBuilder().setRequestId("1").setCreateSubscription(CreateSubscriptionRequest.newBuilder().setDatabase("sharding_db")).build(); + CDCResponse actualResponse = handler.createSubscription(request); + assertThat(actualResponse.getStatus(), is(Status.FAILED)); + } Review Comment: `is(Status.FAILED)` doesn't match method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
