Caideyipi commented on code in PR #15275:
URL: https://github.com/apache/iotdb/pull/15275#discussion_r2043935937


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -2074,6 +2074,16 @@ public SettableFuture<ConfigTaskResult> alterPipe(final 
AlterPipeStatement alter
     final Map<String, String> connectorAttributes;
     try {
       if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
+        // We simply don't allow to alter external sources

Review Comment:
   Now the implementation is "cannot alter source if the original source is 
external", but it seems that the intended one is "cannot alter source type"? 
Now you can alter from iotdb-source to another, and cannot alter the 
"mqtt-source"'s own variables... Not perfect



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.mqtt;
+
+import 
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import org.apache.iotdb.pipe.api.annotation.TableModel;
+import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
+
+import io.moquette.BrokerConstants;
+import io.moquette.broker.Server;
+import io.moquette.broker.config.IConfig;
+import io.moquette.broker.config.MemoryConfig;
+import io.moquette.broker.security.IAuthenticator;
+import io.moquette.interception.InterceptHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@TreeModel
+@TableModel
+public class MQTTExtractor implements PipeExtractor {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MQTTExtractor.class);
+
+  protected String pipeName;
+  protected long creationTime;
+  protected PipeTaskMeta pipeTaskMeta;
+  protected final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue =
+      new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());
+
+  protected IConfig config;
+  protected List<InterceptHandler> handlers;
+  protected IAuthenticator authenticator;
+  private final Server server = new Server();
+
+  protected final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+  @Override
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    if (!validator
+        .getParameters()
+        .getBooleanOrDefault(
+            Arrays.asList(
+                
PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
+                
PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY),
+            
PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE))
 {

Review Comment:
   Should we set it to true as default value when the source is "mqtt-source"?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java:
##########
@@ -84,4 +84,23 @@ public Map<Integer, PipeTask> build() throws 
IllegalPathException {
     }
     return consensusGroupIdToPipeTaskMap;
   }
+
+  public Map<Integer, PipeTask> buildExternalPipeTasks() {
+    final Map<Integer, PipeTask> consensusGroupIdToPipeTaskMap = new 
HashMap<>();
+    final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta();
+    final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
+
+    for (Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta :
+        pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
+      final int consensusGroupId = consensusGroupIdToPipeTaskMeta.getKey();

Review Comment:
   better rename to "taskId"



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java:
##########
@@ -147,6 +152,34 @@ public class PipeExtractorConstant {
   public static final String SOURCE_SKIP_IF_KEY = "source.skipif";
   public static final String EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = 
"no-privileges";
 
+  ////////////////// external sources ////////////////
+  public static final String EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY = 
"extractor.balance-strategy";
+  public static final String EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY = 
"source.balance-strategy";
+  public static final String EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY = 
"proportion";
+  public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET =

Review Comment:
   CONNECTOR?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.pipe.util;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * The ExternalLoadBalancer is responsible for assigning parallel extraction 
tasks from an external
+ * source to available DataNodes in the cluster.
+ */
+public class ExternalLoadBalancer {
+  private final BalanceStrategy strategy;
+
+  public ExternalLoadBalancer(final String balanceStrategy) {
+    switch (balanceStrategy) {
+      case 
PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY:
+        this.strategy = new ProportionalBalanceStrategy();
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown load balance strategy: " + 
balanceStrategy);
+    }
+  }
+
+  /**
+   * Balances the given number of parallel tasks across available nodes.
+   *
+   * @param parallelCount number of external source tasks to distribute
+   * @param pipeStaticMeta metadata about the pipe extractor
+   * @param configManager reference to ConfigManager for cluster information
+   * @return a mapping from task index to leader node id
+   */
+  public Map<Integer, Integer> balance(
+      final int parallelCount,
+      final PipeStaticMeta pipeStaticMeta,
+      final ConfigManager configManager) {
+    return strategy.balance(parallelCount, pipeStaticMeta, configManager);
+  }
+
+  public interface BalanceStrategy {
+    Map<Integer, Integer> balance(
+        final int parallelCount,
+        final PipeStaticMeta pipeStaticMeta,
+        final ConfigManager configManager);
+  }
+
+  public static class ProportionalBalanceStrategy implements BalanceStrategy {
+    @Override
+    public Map<Integer, Integer> balance(
+        final int parallelCount,
+        final PipeStaticMeta pipeStaticMeta,
+        final ConfigManager configManager) {
+      final Map<TConsensusGroupId, Integer> regionLeaderMap =
+          configManager.getLoadManager().getRegionLeaderMap();
+      final Map<Integer, Integer> parallelAssignment = new HashMap<>();
+
+      // Check if the external extractor is single instance per node
+      if (pipeStaticMeta
+          .getExtractorParameters()
+          .getBooleanOrDefault(
+              Arrays.asList(
+                  
PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY,
+                  
PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY),
+              
PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE))
 {
+        final List<Integer> runningDataNodes =
+            
configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+                .sorted()
+                .collect(Collectors.toList());
+        if (runningDataNodes.isEmpty()) {
+          throw new RuntimeException("No available datanode to assign tasks");
+        }
+        final int numNodes = runningDataNodes.size();
+        for (int i = 1; i <= Math.min(numNodes, parallelCount); i++) {
+          final int datanodeId = runningDataNodes.get(i - 1);
+          parallelAssignment.put(-i, datanodeId);
+        }
+        return parallelAssignment;
+      }
+
+      // Count how many DataRegions each DataNode leads
+      final Map<Integer, Integer> leaderRegionId2DataRegionCountMap = new 
HashMap<>();
+      regionLeaderMap.entrySet().stream()
+          .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion 
&& e.getValue() != -1)
+          .forEach(
+              e -> {
+                final int leaderId = e.getValue();
+                leaderRegionId2DataRegionCountMap.put(
+                    leaderId, 
leaderRegionId2DataRegionCountMap.getOrDefault(leaderId, 0) + 1);
+              });
+
+      // distribute evenly if no dataRegion exists
+      if (leaderRegionId2DataRegionCountMap.isEmpty()) {
+        List<Integer> runningDataNodes =
+            
configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
+                .sorted()
+                .collect(Collectors.toList());
+        if (runningDataNodes.isEmpty()) {
+          throw new RuntimeException("No available datanode to assign tasks");
+        }
+        final int numNodes = runningDataNodes.size();
+        for (int i = 1; i <= parallelCount; i++) {
+          final int nodeIndex = (i - 1) % numNodes;
+          final int datanodeId = runningDataNodes.get(nodeIndex);
+          parallelAssignment.put(-i, datanodeId);
+        }
+        return parallelAssignment;
+      }
+
+      final int totalRegions =
+          
leaderRegionId2DataRegionCountMap.values().stream().mapToInt(Integer::intValue).sum();
+
+      // Calculate exact and floor share of each leader
+      final Map<Integer, Double> leaderRegionId2ExactShareMap = new 
HashMap<>();
+      final Map<Integer, Integer> leaderRegionId2AssignedCountMap = new 
HashMap<>();
+      for (Map.Entry<Integer, Integer> entry : 
leaderRegionId2DataRegionCountMap.entrySet()) {
+        final double share = (parallelCount * entry.getValue()) / (double) 
totalRegions;
+        leaderRegionId2ExactShareMap.put(entry.getKey(), share);
+        leaderRegionId2AssignedCountMap.put(entry.getKey(), (int) 
Math.floor(share));
+      }
+
+      // Distribute remainder tasks based on largest fractional parts
+      final int remainder =
+          parallelCount
+              - 
leaderRegionId2AssignedCountMap.values().stream().mapToInt(Integer::intValue).sum();
+
+      final List<Integer> sortedLeaders =
+          leaderRegionId2ExactShareMap.keySet().stream()
+              .sorted(
+                  (l1, l2) -> {
+                    final double diff =
+                        (leaderRegionId2ExactShareMap.get(l2)
+                                - 
Math.floor(leaderRegionId2ExactShareMap.get(l2)))
+                            - (leaderRegionId2ExactShareMap.get(l1)
+                                - 
Math.floor(leaderRegionId2ExactShareMap.get(l1)));
+                    return diff > 0 ? 1 : (diff < 0 ? -1 : Integer.compare(l1, 
l2));
+                  })
+              .collect(Collectors.toList());
+      for (int i = 0; i < remainder; i++) {
+        final int leaderId = sortedLeaders.get(i % sortedLeaders.size());
+        leaderRegionId2AssignedCountMap.put(
+            leaderId, leaderRegionId2AssignedCountMap.get(leaderId) + 1);
+      }
+
+      final List<Integer> stableLeaders = new 
ArrayList<>(leaderRegionId2AssignedCountMap.keySet());
+      Collections.sort(stableLeaders);

Review Comment:
   Seemingly no need to sort here...



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/mqtt/MQTTExtractor.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.mqtt;
+
+import org.apache.iotdb.pipe.api.PipeExtractor;
+import org.apache.iotdb.pipe.api.annotation.TableModel;
+import org.apache.iotdb.pipe.api.annotation.TreeModel;
+import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+
+@TreeModel
+@TableModel
+public class MQTTExtractor implements PipeExtractor {

Review Comment:
   Better add some comments here



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java:
##########
@@ -100,8 +108,49 @@ public boolean 
executeFromValidateTask(ConfigNodeProcedureEnv env) {
   @Override
   public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) {
     LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask");
-
-    // Do nothing
+    // Re-balance the external source tasks here in case of any changes in the 
dataRegion
+    pipeTaskInfo
+        .get()
+        .getPipeMetaList()
+        .forEach(
+            pipeMeta -> {
+              if (pipeMeta.getStaticMeta().isSourceExternal()) {
+                final ExternalLoadBalancer loadBalancer =
+                    new ExternalLoadBalancer(
+                        pipeMeta
+                            .getStaticMeta()
+                            .getExtractorParameters()
+                            .getStringOrDefault(
+                                Arrays.asList(
+                                    
PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY,
+                                    
PipeExtractorConstant.EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY),
+                                PipeExtractorConstant
+                                    
.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY));
+
+                final int parallelism =
+                    pipeMeta
+                        .getStaticMeta()
+                        .getExtractorParameters()
+                        .getIntOrDefault(
+                            Arrays.asList(
+                                EXTERNAL_EXTRACTOR_PARALLELISM_KEY,
+                                EXTERNAL_SOURCE_PARALLELISM_KEY),
+                            EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE);
+                loadBalancer
+                    .balance(
+                        parallelism,
+                        pipeMeta.getStaticMeta(),
+                        ConfigNode.getInstance().getConfigManager())
+                    .forEach(
+                        (taskIndex, newLeader) -> {
+                          pipeMeta
+                              .getRuntimeMeta()
+                              .getConsensusGroupId2TaskMetaMap()
+                              .get(taskIndex)

Review Comment:
   What if the runtime meta does not contain the task meta?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to