Caideyipi commented on code in PR #15275: URL: https://github.com/apache/iotdb/pull/15275#discussion_r2043935937
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java: ########## @@ -2074,6 +2074,16 @@ public SettableFuture<ConfigTaskResult> alterPipe(final AlterPipeStatement alter final Map<String, String> connectorAttributes; try { if (!alterPipeStatement.getExtractorAttributes().isEmpty()) { + // We simply don't allow to alter external sources Review Comment: Now the implementation is "cannot alter source if the original source is external", but it seems that the intended one is "cannot alter source type"? Now you can alter from iotdb-source to another, and cannot alter the "mqtt-source"'s own variables... Not perfect ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/mqtt/MQTTExtractor.java: ########## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.extractor.mqtt; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator; +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; + +import io.moquette.BrokerConstants; +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.broker.security.IAuthenticator; +import io.moquette.interception.InterceptHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +@TreeModel +@TableModel +public class MQTTExtractor implements PipeExtractor { + + private static final Logger LOGGER = LoggerFactory.getLogger(MQTTExtractor.class); + + protected String pipeName; + protected long creationTime; + protected PipeTaskMeta pipeTaskMeta; + protected final UnboundedBlockingPendingQueue<EnrichedEvent> pendingQueue = + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter()); + + protected IConfig config; + protected List<InterceptHandler> handlers; + protected IAuthenticator authenticator; + private final Server server = new Server(); + + protected final AtomicBoolean isClosed = new AtomicBoolean(false); + + @Override + public void validate(final PipeParameterValidator validator) throws Exception { + if (!validator + .getParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY), + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) { Review Comment: Should we set it to true as default value when the source is "mqtt-source"? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeBuilder.java: ########## @@ -84,4 +84,23 @@ public Map<Integer, PipeTask> build() throws IllegalPathException { } return consensusGroupIdToPipeTaskMap; } + + public Map<Integer, PipeTask> buildExternalPipeTasks() { + final Map<Integer, PipeTask> consensusGroupIdToPipeTaskMap = new HashMap<>(); + final PipeStaticMeta pipeStaticMeta = pipeMeta.getStaticMeta(); + final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta(); + + for (Map.Entry<Integer, PipeTaskMeta> consensusGroupIdToPipeTaskMeta : + pipeRuntimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { + final int consensusGroupId = consensusGroupIdToPipeTaskMeta.getKey(); Review Comment: better rename to "taskId" ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java: ########## @@ -147,6 +152,34 @@ public class PipeExtractorConstant { public static final String SOURCE_SKIP_IF_KEY = "source.skipif"; public static final String EXTRACTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges"; + ////////////////// external sources //////////////// + public static final String EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY = "extractor.balance-strategy"; + public static final String EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY = "source.balance-strategy"; + public static final String EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY = "proportion"; + public static final Set<String> CONNECTOR_LOAD_BALANCE_STRATEGY_SET = Review Comment: CONNECTOR? ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/util/ExternalLoadBalancer.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.pipe.util; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant; +import org.apache.iotdb.confignode.manager.ConfigManager; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * The ExternalLoadBalancer is responsible for assigning parallel extraction tasks from an external + * source to available DataNodes in the cluster. + */ +public class ExternalLoadBalancer { + private final BalanceStrategy strategy; + + public ExternalLoadBalancer(final String balanceStrategy) { + switch (balanceStrategy) { + case PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY: + this.strategy = new ProportionalBalanceStrategy(); + break; + default: + throw new IllegalArgumentException("Unknown load balance strategy: " + balanceStrategy); + } + } + + /** + * Balances the given number of parallel tasks across available nodes. + * + * @param parallelCount number of external source tasks to distribute + * @param pipeStaticMeta metadata about the pipe extractor + * @param configManager reference to ConfigManager for cluster information + * @return a mapping from task index to leader node id + */ + public Map<Integer, Integer> balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager) { + return strategy.balance(parallelCount, pipeStaticMeta, configManager); + } + + public interface BalanceStrategy { + Map<Integer, Integer> balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager); + } + + public static class ProportionalBalanceStrategy implements BalanceStrategy { + @Override + public Map<Integer, Integer> balance( + final int parallelCount, + final PipeStaticMeta pipeStaticMeta, + final ConfigManager configManager) { + final Map<TConsensusGroupId, Integer> regionLeaderMap = + configManager.getLoadManager().getRegionLeaderMap(); + final Map<Integer, Integer> parallelAssignment = new HashMap<>(); + + // Check if the external extractor is single instance per node + if (pipeStaticMeta + .getExtractorParameters() + .getBooleanOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_SINGLE_INSTANCE_PER_NODE_KEY), + PipeExtractorConstant.EXTERNAL_EXTRACTOR_SINGLE_INSTANCE_PER_NODE_DEFAULT_VALUE)) { + final List<Integer> runningDataNodes = + configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() + .sorted() + .collect(Collectors.toList()); + if (runningDataNodes.isEmpty()) { + throw new RuntimeException("No available datanode to assign tasks"); + } + final int numNodes = runningDataNodes.size(); + for (int i = 1; i <= Math.min(numNodes, parallelCount); i++) { + final int datanodeId = runningDataNodes.get(i - 1); + parallelAssignment.put(-i, datanodeId); + } + return parallelAssignment; + } + + // Count how many DataRegions each DataNode leads + final Map<Integer, Integer> leaderRegionId2DataRegionCountMap = new HashMap<>(); + regionLeaderMap.entrySet().stream() + .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion && e.getValue() != -1) + .forEach( + e -> { + final int leaderId = e.getValue(); + leaderRegionId2DataRegionCountMap.put( + leaderId, leaderRegionId2DataRegionCountMap.getOrDefault(leaderId, 0) + 1); + }); + + // distribute evenly if no dataRegion exists + if (leaderRegionId2DataRegionCountMap.isEmpty()) { + List<Integer> runningDataNodes = + configManager.getLoadManager().filterDataNodeThroughStatus(NodeStatus.Running).stream() + .sorted() + .collect(Collectors.toList()); + if (runningDataNodes.isEmpty()) { + throw new RuntimeException("No available datanode to assign tasks"); + } + final int numNodes = runningDataNodes.size(); + for (int i = 1; i <= parallelCount; i++) { + final int nodeIndex = (i - 1) % numNodes; + final int datanodeId = runningDataNodes.get(nodeIndex); + parallelAssignment.put(-i, datanodeId); + } + return parallelAssignment; + } + + final int totalRegions = + leaderRegionId2DataRegionCountMap.values().stream().mapToInt(Integer::intValue).sum(); + + // Calculate exact and floor share of each leader + final Map<Integer, Double> leaderRegionId2ExactShareMap = new HashMap<>(); + final Map<Integer, Integer> leaderRegionId2AssignedCountMap = new HashMap<>(); + for (Map.Entry<Integer, Integer> entry : leaderRegionId2DataRegionCountMap.entrySet()) { + final double share = (parallelCount * entry.getValue()) / (double) totalRegions; + leaderRegionId2ExactShareMap.put(entry.getKey(), share); + leaderRegionId2AssignedCountMap.put(entry.getKey(), (int) Math.floor(share)); + } + + // Distribute remainder tasks based on largest fractional parts + final int remainder = + parallelCount + - leaderRegionId2AssignedCountMap.values().stream().mapToInt(Integer::intValue).sum(); + + final List<Integer> sortedLeaders = + leaderRegionId2ExactShareMap.keySet().stream() + .sorted( + (l1, l2) -> { + final double diff = + (leaderRegionId2ExactShareMap.get(l2) + - Math.floor(leaderRegionId2ExactShareMap.get(l2))) + - (leaderRegionId2ExactShareMap.get(l1) + - Math.floor(leaderRegionId2ExactShareMap.get(l1))); + return diff > 0 ? 1 : (diff < 0 ? -1 : Integer.compare(l1, l2)); + }) + .collect(Collectors.toList()); + for (int i = 0; i < remainder; i++) { + final int leaderId = sortedLeaders.get(i % sortedLeaders.size()); + leaderRegionId2AssignedCountMap.put( + leaderId, leaderRegionId2AssignedCountMap.get(leaderId) + 1); + } + + final List<Integer> stableLeaders = new ArrayList<>(leaderRegionId2AssignedCountMap.keySet()); + Collections.sort(stableLeaders); Review Comment: Seemingly no need to sort here... ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/extractor/mqtt/MQTTExtractor.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.mqtt; + +import org.apache.iotdb.pipe.api.PipeExtractor; +import org.apache.iotdb.pipe.api.annotation.TableModel; +import org.apache.iotdb.pipe.api.annotation.TreeModel; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; + +@TreeModel +@TableModel +public class MQTTExtractor implements PipeExtractor { Review Comment: Better add some comments here ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java: ########## @@ -100,8 +108,49 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) { @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { LOGGER.info("PipeMetaSyncProcedure: executeFromCalculateInfoForTask"); - - // Do nothing + // Re-balance the external source tasks here in case of any changes in the dataRegion + pipeTaskInfo + .get() + .getPipeMetaList() + .forEach( + pipeMeta -> { + if (pipeMeta.getStaticMeta().isSourceExternal()) { + final ExternalLoadBalancer loadBalancer = + new ExternalLoadBalancer( + pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getStringOrDefault( + Arrays.asList( + PipeExtractorConstant.EXTERNAL_EXTRACTOR_BALANCE_STRATEGY_KEY, + PipeExtractorConstant.EXTERNAL_SOURCE_BALANCE_STRATEGY_KEY), + PipeExtractorConstant + .EXTERNAL_EXTRACTOR_BALANCE_PROPORTION_STRATEGY)); + + final int parallelism = + pipeMeta + .getStaticMeta() + .getExtractorParameters() + .getIntOrDefault( + Arrays.asList( + EXTERNAL_EXTRACTOR_PARALLELISM_KEY, + EXTERNAL_SOURCE_PARALLELISM_KEY), + EXTERNAL_EXTRACTOR_PARALLELISM_DEFAULT_VALUE); + loadBalancer + .balance( + parallelism, + pipeMeta.getStaticMeta(), + ConfigNode.getInstance().getConfigManager()) + .forEach( + (taskIndex, newLeader) -> { + pipeMeta + .getRuntimeMeta() + .getConsensusGroupId2TaskMetaMap() + .get(taskIndex) Review Comment: What if the runtime meta does not contain the task meta? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org