sandynz commented on code in PR #22187:
URL: https://github.com/apache/shardingsphere/pull/22187#discussion_r1023454607


##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrier.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;

Review Comment:
   SPI could put in `spi` package



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrier.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@SingletonSPI
+public interface PipelineDistributedBarrier extends RequiredSPI, TypedSPI {

Review Comment:
   `TypedSPI` is not necessary, use `RequiredSPI` could simplify API usage



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrierFactory.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
+
+/**
+ * Pipeline distributed barrier factory.
+ */
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class PipelineDistributedBarrierFactory {
+    
+    static {
+        ShardingSphereServiceLoader.register(PipelineDistributedBarrier.class);
+    }
+    
+    /**
+     * Get instance of pipeline distribute barrier.
+     *
+     * @param type type
+     * @return got instance
+     */
+    public static PipelineDistributedBarrier getInstance(final String type) {
+        return 
TypedSPIRegistry.getRegisteredService(PipelineDistributedBarrier.class, type);
+    }

Review Comment:
   1, `type` parameter could be removed
   
   2, RequiredSPIRegistry could be used to replace TypedSPIRegistry
   



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineDistributedBarrierFactory.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api;

Review Comment:
   SPI factory could be put in `spi` package



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/ZookeeperPipelineDistributedBarrier.java:
##########
@@ -38,9 +39,7 @@
  * Pipeline distributed barrier.
  */
 @Slf4j
-public final class PipelineDistributedBarrier {
-    
-    private static final PipelineDistributedBarrier INSTANCE = new 
PipelineDistributedBarrier();
+public final class ZookeeperPipelineDistributedBarrier implements 
PipelineDistributedBarrier {

Review Comment:
   It's better not add `Zookeeper` prefix for class name, it could support all 
persistance repository later



##########
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier:
##########
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.core.api.impl.ZookeeperPipelineDistributedBarrier

Review Comment:
   New line is needed at end of file



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java:
##########
@@ -45,7 +46,7 @@ public void assertRegisterAndRemove() throws 
NoSuchFieldException, IllegalAccess
         String jobId = "j0130317c3054317c7363616c696e675f626d73716c";
         PersistRepository repository = 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
         repository.persist(PipelineMetaDataNode.getJobRootPath(jobId), "");
-        PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance();
+        PipelineDistributedBarrier instance = 
PipelineContext.getPipelineDistributedBarrier();
         String parentPath = "/barrier";
         instance.register(parentPath, 1);
         Map countDownLatchMap = ReflectionUtil.getFieldValue(instance, 
"countDownLatchMap", Map.class);

Review Comment:
   Looks unit test failed since `countDownLatchMap`



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements 
PipelineDistributedBarrier {
+    
+    @Override
+    public void register(final String barrierPath, final int totalCount) {
+    }
+    
+    @Override
+    public void persistEphemeralChildrenNode(final String barrierPath, final 
int shardingItem) {
+    }
+    
+    @Override
+    public void unregister(final String barrierPath) {
+    }
+    
+    @Override
+    public boolean await(final String barrierPath, final long timeout, final 
TimeUnit timeUnit) {
+        return false;
+    }

Review Comment:
   Does `return false;` affect unit test? Seem it should `return true;`



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java:
##########
@@ -56,8 +56,6 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
-    

Review Comment:
   Could we keep it and use PipelineDistributedBarrierFactory.getInstance() to 
replace PipelineDistributedBarrier.getInstance()



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineContext.java:
##########
@@ -53,6 +57,24 @@ public static void initModeConfig(final ModeConfiguration 
modeConfig) {
         PipelineContext.modeConfig = modeConfig;
     }
     
+    /**
+     * Get pipeline distributed barrier.
+     *
+     * @return pipeline distributed barrier
+     */
+    public static PipelineDistributedBarrier getPipelineDistributedBarrier() {
+        return pipelineDistributedBarrier;
+    }
+    
+    /**
+     * Initialize pipeline distributed barrier.
+     *
+     * @param type type
+     */
+    public static void initPipelineDistributedBarrier(final String type) {
+        pipelineDistributedBarrier = 
PipelineDistributedBarrierFactory.getInstance(type);
+    }

Review Comment:
   It could be removed



##########
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDistributedBarrier.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.fixture;
+
+import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineDistributedBarrier;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.concurrent.TimeUnit;
+
+public final class FixturePipelineDistributedBarrier implements 
PipelineDistributedBarrier {
+    
+    @Override
+    public void register(final String barrierPath, final int totalCount) {
+    }
+    
+    @Override
+    public void persistEphemeralChildrenNode(final String barrierPath, final 
int shardingItem) {
+    }
+    
+    @Override
+    public void unregister(final String barrierPath) {
+    }
+    
+    @Override
+    public boolean await(final String barrierPath, final long timeout, final 
TimeUnit timeUnit) {
+        return false;
+    }
+    
+    @Override
+    public void notifyChildrenNodeCountCheck(final DataChangedEvent event) {
+    }
+    
+    @Override
+    public String getType() {
+        return "FIXTURE";
+    }

Review Comment:
   `getType()` could be replaced to `isDefault()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to