This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 063ea4b7cb3 Use PipelineMetaDataNode replace node path (#17871)
063ea4b7cb3 is described below
commit 063ea4b7cb3b17c67da09ce0c2fc6b7d107d730b
Author: azexcy <[email protected]>
AuthorDate: Sun May 29 10:58:50 2022 +0800
Use PipelineMetaDataNode replace node path (#17871)
---
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 11 ++------
.../core/api/impl/RuleAlteredJobAPIImpl.java | 6 ++--
.../core/metadata/node/PipelineMetaDataNode.java | 32 ++++++++++++++++++----
.../metadata/node/PipelineMetaDataNodeTest.java | 15 ++++++----
4 files changed, 42 insertions(+), 22 deletions(-)
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 1843cfe010c..9710ee95e56 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
-import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.JobProgressYamlSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
@@ -97,23 +96,19 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
@Override
public void persistJobCheckResult(final String jobId, final boolean
checkSuccess) {
log.info("persist job check result '{}' for job {}", checkSuccess,
jobId);
- repository.persist(getCheckResultPath(jobId),
String.valueOf(checkSuccess));
- }
-
- private String getCheckResultPath(final String jobId) {
- return String.format("%s/%s/check/result",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
+
repository.persist(PipelineMetaDataNode.getScalingCheckResultPath(jobId),
String.valueOf(checkSuccess));
}
@Override
public Optional<Boolean> getJobCheckResult(final String jobId) {
- String data = repository.get(getCheckResultPath(jobId));
+ String data =
repository.get(PipelineMetaDataNode.getScalingCheckResultPath(jobId));
return Strings.isNullOrEmpty(data) ? Optional.empty() :
Optional.of(Boolean.parseBoolean(data));
}
@Override
public void deleteJob(final String jobId) {
log.info("delete job {}", jobId);
- repository.delete(String.format("%s/%s",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId));
+ repository.delete(PipelineMetaDataNode.getScalingJobPath(jobId));
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 701ad25c5d9..72ee8aba226 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -31,11 +31,11 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobCenter;
@@ -110,12 +110,12 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
log.info("Start scaling job by {}", jobConfig);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
String jobId = jobConfig.getJobId();
- String jobConfigKey = String.format("%s/%s/config",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId);
+ String jobConfigKey =
PipelineMetaDataNode.getScalingJobConfigPath(jobId);
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(String.format("%s/%s",
DataPipelineConstants.DATA_PIPELINE_ROOT, jobId),
RuleAlteredJob.class.getName());
+ repositoryAPI.persist(PipelineMetaDataNode.getScalingJobPath(jobId),
RuleAlteredJob.class.getName());
repositoryAPI.persist(jobConfigKey, createJobConfigText(jobConfig));
return Optional.of(jobId);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index a3608ae4ec0..b31a0064bed 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.metadata.node;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
/**
* Scaling meta data node.
@@ -26,8 +27,6 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineMetaDataNode {
- public static final String ROOT_NODE = "scaling";
-
/**
* Get job config path.
*
@@ -35,16 +34,17 @@ public final class PipelineMetaDataNode {
* @return job config path.
*/
public static String getJobConfigPath(final String jobId) {
- return String.join("/", getScalingRootPath(), jobId, "config");
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "config");
}
/**
* Get scaling root path.
*
+ * @param jobId job id.
* @return root path
*/
- public static String getScalingRootPath() {
- return "/" + ROOT_NODE;
+ public static String getScalingJobPath(final String jobId) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId);
}
/**
@@ -65,6 +65,26 @@ public final class PipelineMetaDataNode {
* @return job offset path.
*/
public static String getScalingJobOffsetPath(final String jobId) {
- return String.join("/", getScalingRootPath(), jobId, "offset");
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "offset");
+ }
+
+ /**
+ * Get scaling job config path.
+ *
+ * @param jobId job id.
+ * @return job config path.
+ */
+ public static String getScalingJobConfigPath(final String jobId) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "config");
+ }
+
+ /**
+ * Get scaling job config path.
+ *
+ * @param jobId job id.
+ * @return job config path.
+ */
+ public static String getScalingCheckResultPath(final String jobId) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobId, "check", "result");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
index 5650f5387da..387a86dfbf3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNodeTest.java
@@ -24,11 +24,6 @@ import static org.junit.Assert.assertThat;
public final class PipelineMetaDataNodeTest {
- @Test
- public void assertGetScalingRootPath() {
- assertThat(PipelineMetaDataNode.getScalingRootPath(), is("/scaling"));
- }
-
@Test
public void assertGetJobConfigPath() {
String actualOffsetPath =
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462");
@@ -36,4 +31,14 @@ public final class PipelineMetaDataNodeTest {
actualOffsetPath =
PipelineMetaDataNode.getScalingJobOffsetPath("0130317c30317c3054317c7368617264696e675f6462",
1);
assertThat(actualOffsetPath,
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/offset/1"));
}
+
+ @Test
+ public void assertGetScalingJobConfigPath() {
+
assertThat(PipelineMetaDataNode.getScalingJobConfigPath("0130317c30317c3054317c7368617264696e675f6462"),
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/config"));
+ }
+
+ @Test
+ public void assertGetScalingCheckResultPath() {
+
assertThat(PipelineMetaDataNode.getScalingCheckResultPath("0130317c30317c3054317c7368617264696e675f6462"),
is("/scaling/0130317c30317c3054317c7368617264696e675f6462/check/result"));
+ }
}