This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 796d36be233 Keep stop_time and remove stop_time_millis in pipeline job 
config properties (#29277)
796d36be233 is described below

commit 796d36be2338c66ba93f1730d1a7ef2e2e2f72de
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Dec 5 16:55:13 2023 +0800

    Keep stop_time and remove stop_time_millis in pipeline job config 
properties (#29277)
    
    * Ignore interrupt exception
    
    * Remove stop_time when job start
    
    * Keep stop_time, remove stop_time_mills
    
    * improve sleep interrupt
    
    * Revoke sleep interrupt changes
---
 .../data/pipeline/core/job/service/PipelineJobManager.java       | 2 --
 .../apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java   | 3 +--
 .../data/pipeline/cdc/util/CDCSchemaTableUtils.java              | 9 ++++++++-
 .../data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java          | 2 +-
 4 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 2d98b49be6a..572d620b1db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -96,7 +96,6 @@ public final class PipelineJobManager {
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
         jobConfigPOJO.getProps().remove("stop_time");
-        jobConfigPOJO.getProps().remove("stop_time_millis");
         jobConfigPOJO.getProps().setProperty("run_count", 
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
 "0")) + 1));
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         pipelineDistributedBarrier.register(barrierEnablePath, 
jobConfigPOJO.getShardingTotalCount());
@@ -147,7 +146,6 @@ public final class PipelineJobManager {
         }
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
-        jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
         String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
         pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index ea30013cd48..5b137ca5988 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -225,7 +225,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
-        jobConfigPOJO.getProps().remove("stop_time_millis");
+        jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
     }
     
@@ -238,7 +238,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
-        jobConfigPOJO.getProps().setProperty("stop_time_millis", 
String.valueOf(System.currentTimeMillis()));
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
index ef554a9e86a..ff81e45f019 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.infra.database.core.metadata.database.system.Di
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
+import org.apache.shardingsphere.infra.exception.TableNotExistsException;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -57,13 +58,19 @@ public final class CDCSchemaTableUtils {
             return parseTableExpressionWithAllTables(database, systemSchemas);
         }
         Map<String, Set<String>> result = new HashMap<>();
+        DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
         for (SchemaTable each : schemaTables) {
             if ("*".equals(each.getSchema())) {
                 result.putAll(parseTableExpressionWithAllSchema(database, 
systemSchemas, each));
             } else if ("*".equals(each.getTable())) {
                 result.putAll(parseTableExpressionWithAllTable(database, 
each));
             } else {
-                result.computeIfAbsent(each.getSchema(), ignored -> new 
HashSet<>()).add(each.getTable());
+                String schemaName = each.getSchema();
+                if (dialectDatabaseMetaData.getDefaultSchema().isPresent() && 
schemaName.isEmpty()) {
+                    schemaName = 
dialectDatabaseMetaData.getDefaultSchema().get();
+                }
+                
ShardingSpherePreconditions.checkNotNull(database.getSchema(schemaName).getTable(each.getTable()),
 () -> new TableNotExistsException(each.getTable()));
+                result.computeIfAbsent(schemaName, ignored -> new 
HashSet<>()).add(each.getTable());
             }
         }
         return result;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
index c0476164c42..220be845acc 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
@@ -55,7 +55,7 @@ class CDCSchemaTableUtilsTest {
         assertThat(actual, is(expected));
         schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
         actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, 
schemaTables);
-        expected = Collections.singletonMap("", 
Collections.singleton("t_order"));
+        expected = Collections.singletonMap("public", 
Collections.singleton("t_order"));
         assertThat(actual, is(expected));
         schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
         actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, 
schemaTables);

Reply via email to