This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 796d36be233 Keep stop_time and remove stop_time_millis in pipeline job
config properties (#29277)
796d36be233 is described below
commit 796d36be2338c66ba93f1730d1a7ef2e2e2f72de
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Dec 5 16:55:13 2023 +0800
Keep stop_time and remove stop_time_millis in pipeline job config
properties (#29277)
* Ignore interrupt exception
* Remove stop_time when job start
* Keep stop_time, remove stop_time_mills
* improve sleep interrupt
* Revoke sleep interrupt changes
---
.../data/pipeline/core/job/service/PipelineJobManager.java | 2 --
.../apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java | 3 +--
.../data/pipeline/cdc/util/CDCSchemaTableUtils.java | 9 ++++++++-
.../data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java | 2 +-
4 files changed, 10 insertions(+), 6 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 2d98b49be6a..572d620b1db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -96,7 +96,6 @@ public final class PipelineJobManager {
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
jobConfigPOJO.getProps().remove("stop_time");
- jobConfigPOJO.getProps().remove("stop_time_millis");
jobConfigPOJO.getProps().setProperty("run_count",
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
"0")) + 1));
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
pipelineDistributedBarrier.register(barrierEnablePath,
jobConfigPOJO.getShardingTotalCount());
@@ -147,7 +146,6 @@ public final class PipelineJobManager {
}
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
- jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index ea30013cd48..5b137ca5988 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -225,7 +225,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- jobConfigPOJO.getProps().remove("stop_time_millis");
+ jobConfigPOJO.getProps().remove("stop_time");
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
}
@@ -238,7 +238,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
jobConfigPOJO.setDisabled(true);
jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter()));
- jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobConfigPOJO.getJobName())).updateJobConfiguration(jobConfigPOJO);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
index ef554a9e86a..ff81e45f019 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtils.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.infra.database.core.metadata.database.system.Di
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
+import org.apache.shardingsphere.infra.exception.TableNotExistsException;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
@@ -57,13 +58,19 @@ public final class CDCSchemaTableUtils {
return parseTableExpressionWithAllTables(database, systemSchemas);
}
Map<String, Set<String>> result = new HashMap<>();
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
for (SchemaTable each : schemaTables) {
if ("*".equals(each.getSchema())) {
result.putAll(parseTableExpressionWithAllSchema(database,
systemSchemas, each));
} else if ("*".equals(each.getTable())) {
result.putAll(parseTableExpressionWithAllTable(database,
each));
} else {
- result.computeIfAbsent(each.getSchema(), ignored -> new
HashSet<>()).add(each.getTable());
+ String schemaName = each.getSchema();
+ if (dialectDatabaseMetaData.getDefaultSchema().isPresent() &&
schemaName.isEmpty()) {
+ schemaName =
dialectDatabaseMetaData.getDefaultSchema().get();
+ }
+
ShardingSpherePreconditions.checkNotNull(database.getSchema(schemaName).getTable(each.getTable()),
() -> new TableNotExistsException(each.getTable()));
+ result.computeIfAbsent(schemaName, ignored -> new
HashSet<>()).add(each.getTable());
}
}
return result;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
index c0476164c42..220be845acc 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilsTest.java
@@ -55,7 +55,7 @@ class CDCSchemaTableUtilsTest {
assertThat(actual, is(expected));
schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database,
schemaTables);
- expected = Collections.singletonMap("",
Collections.singleton("t_order"));
+ expected = Collections.singletonMap("public",
Collections.singleton("t_order"));
assertThat(actual, is(expected));
schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
actual = CDCSchemaTableUtils.parseTableExpressionWithSchema(database,
schemaTables);