Caideyipi commented on code in PR #11639: URL: https://github.com/apache/iotdb/pull/11639#discussion_r1410293701
########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeConfigSubtask.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.task.subtask; + +import org.apache.iotdb.commons.pipe.execution.scheduler.PipeConfigSubtaskScheduler; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.schema.IoTDBSchemaConnector; +import org.apache.iotdb.commons.pipe.task.DecoratingLock; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class PipeConfigSubtask + implements FutureCallback<Boolean>, Callable<Boolean>, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigSubtask.class); + + // Used for identifying the subtask + private final String taskID; + + // For thread pool to execute subtasks + private ListeningExecutorService subtaskWorkerThreadPoolExecutor; + + // For controlling the subtask execution + private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private PipeConfigSubtaskScheduler subtaskScheduler; + + // For fail-over + public static final int MAX_RETRY_TIMES = 5; + private final AtomicInteger retryCount = new AtomicInteger(0); + private Event lastEvent; + + private final PipeConnector outputPipeConnector; + + // For thread pool to execute callbacks + private final DecoratingLock callbackDecoratingLock = new DecoratingLock(); + private ExecutorService subtaskCallbackListeningExecutor; + + // For controlling subtask submitting, making sure that a subtask is submitted to only one thread + // at a time + private volatile boolean isSubmitted = false; + + public PipeConfigSubtask( + String taskID, + Map<String, String> extractorAttributes, + Map<String, String> connectorAttributes) + throws Exception { + this.taskID = taskID; + this.outputPipeConnector = new IoTDBSchemaConnector(); + // This connector takes the responsibility of both extractor and connector, + // so we need to merge the attributes of extractor and connector + Map<String, String> allAttributes = new HashMap<>(); + allAttributes.putAll(extractorAttributes); + allAttributes.putAll(connectorAttributes); + this.outputPipeConnector.validate( + new PipeParameterValidator(new PipeParameters(allAttributes))); + } + + public void bindExecutors( + ListeningExecutorService subtaskWorkerThreadPoolExecutor, + ExecutorService subtaskCallbackListeningExecutor, + PipeConfigSubtaskScheduler subtaskScheduler) { + this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; + this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; + this.subtaskScheduler = subtaskScheduler; + } + + @Override + public Boolean call() throws Exception { + boolean hasAtLeastOneEventProcessed = false; + + try { + // If the scheduler allows to schedule, then try to consume an event + while (subtaskScheduler.schedule()) { + // If the event is consumed successfully, then continue to consume the next event + // otherwise, stop consuming + if (!executeOnce()) { + break; + } + hasAtLeastOneEventProcessed = true; + } + } finally { + // Reset the scheduler to make sure that the scheduler can schedule again + subtaskScheduler.reset(); + } + + return hasAtLeastOneEventProcessed; + } + + /** Should be synchronized with {@link PipeConfigSubtask#releaseLastEvent} */ + private synchronized void setLastEvent(Event event) { + lastEvent = event; + } + + /** + * Try to consume an event by the pipe plugin. + * + * @return true if the event is consumed successfully, false if no more event can be consumed + * @throws Exception if any error occurs when consuming the event + */ + @SuppressWarnings("squid:S112") // Allow to throw Exception + private boolean executeOnce() throws Exception { + if (isClosed.get()) { + return false; + } + + final Event event = lastEvent != null ? lastEvent : null; // TODO: get an event from upstream + // Record the last event for retry when exception occurs + setLastEvent(event); + // TODO: process the event if necessary + + try { + if (event == null) { + return false; + } + + if (event instanceof TabletInsertionEvent) { Review Comment: ConfigSubTask will not get those events ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/execution/executor/PipeConfigSubtaskExecutor.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.execution.executor; Review Comment: Do datanodes need this? If not, why not put it into confignode packet? ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java: ########## @@ -100,16 +102,37 @@ public boolean isLocked() { /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */ public TSStatus createPipe(TCreatePipeReq req) { - return configManager.getProcedureManager().createPipe(req); + final TSStatus status = configManager.getProcedureManager().createPipe(req); Review Comment: Move the control logic into procedure in case leader configNode crashes, and assure the idempotency of the logic to ensure the redo success of the procedure step. ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeConfigSubtask.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.task.subtask; + +import org.apache.iotdb.commons.pipe.execution.scheduler.PipeConfigSubtaskScheduler; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.schema.IoTDBSchemaConnector; +import org.apache.iotdb.commons.pipe.task.DecoratingLock; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class PipeConfigSubtask + implements FutureCallback<Boolean>, Callable<Boolean>, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeConfigSubtask.class); + + // Used for identifying the subtask + private final String taskID; + + // For thread pool to execute subtasks + private ListeningExecutorService subtaskWorkerThreadPoolExecutor; + + // For controlling the subtask execution + private final AtomicBoolean shouldStopSubmittingSelf = new AtomicBoolean(true); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private PipeConfigSubtaskScheduler subtaskScheduler; + + // For fail-over + public static final int MAX_RETRY_TIMES = 5; + private final AtomicInteger retryCount = new AtomicInteger(0); + private Event lastEvent; + + private final PipeConnector outputPipeConnector; + + // For thread pool to execute callbacks + private final DecoratingLock callbackDecoratingLock = new DecoratingLock(); + private ExecutorService subtaskCallbackListeningExecutor; + + // For controlling subtask submitting, making sure that a subtask is submitted to only one thread + // at a time + private volatile boolean isSubmitted = false; + + public PipeConfigSubtask( + String taskID, + Map<String, String> extractorAttributes, + Map<String, String> connectorAttributes) + throws Exception { + this.taskID = taskID; + this.outputPipeConnector = new IoTDBSchemaConnector(); + // This connector takes the responsibility of both extractor and connector, + // so we need to merge the attributes of extractor and connector + Map<String, String> allAttributes = new HashMap<>(); + allAttributes.putAll(extractorAttributes); + allAttributes.putAll(connectorAttributes); + this.outputPipeConnector.validate( + new PipeParameterValidator(new PipeParameters(allAttributes))); + } + + public void bindExecutors( + ListeningExecutorService subtaskWorkerThreadPoolExecutor, + ExecutorService subtaskCallbackListeningExecutor, + PipeConfigSubtaskScheduler subtaskScheduler) { + this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; + this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; + this.subtaskScheduler = subtaskScheduler; + } + + @Override + public Boolean call() throws Exception { + boolean hasAtLeastOneEventProcessed = false; + + try { + // If the scheduler allows to schedule, then try to consume an event + while (subtaskScheduler.schedule()) { + // If the event is consumed successfully, then continue to consume the next event + // otherwise, stop consuming + if (!executeOnce()) { + break; + } + hasAtLeastOneEventProcessed = true; + } + } finally { + // Reset the scheduler to make sure that the scheduler can schedule again + subtaskScheduler.reset(); + } + + return hasAtLeastOneEventProcessed; + } + + /** Should be synchronized with {@link PipeConfigSubtask#releaseLastEvent} */ + private synchronized void setLastEvent(Event event) { + lastEvent = event; + } + + /** + * Try to consume an event by the pipe plugin. + * + * @return true if the event is consumed successfully, false if no more event can be consumed + * @throws Exception if any error occurs when consuming the event + */ + @SuppressWarnings("squid:S112") // Allow to throw Exception + private boolean executeOnce() throws Exception { + if (isClosed.get()) { + return false; + } + + final Event event = lastEvent != null ? lastEvent : null; // TODO: get an event from upstream + // Record the last event for retry when exception occurs + setLastEvent(event); + // TODO: process the event if necessary + + try { + if (event == null) { + return false; + } + + if (event instanceof TabletInsertionEvent) { + outputPipeConnector.transfer((TabletInsertionEvent) event); + } else if (event instanceof TsFileInsertionEvent) { + outputPipeConnector.transfer((TsFileInsertionEvent) event); + } else { + outputPipeConnector.transfer(event); + } + + releaseLastEvent(true); + } catch (PipeConnectionException e) { + if (!isClosed.get()) { + throw e; + } else { + LOGGER.info("PipeConnectionException in pipe transfer, ignored because pipe is dropped."); + releaseLastEvent(false); + } + } catch (Exception e) { + if (!isClosed.get()) { + throw new PipeException( Review Comment: The exception message too needs to be updated since the users won't implement the connector.. ########## iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/ddl/ISchemaEvent.java: ########## @@ -0,0 +1,5 @@ +package org.apache.iotdb.pipe.api.event.ddl; Review Comment: license? ########## iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/ddl/SchemaEvent.java: ########## @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.api.event.ddl; + +public abstract class SchemaEvent implements ISchemaEvent {} Review Comment: There might better be a "IConsensus" event and "snapShot" event in the common packet. ########## iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/event/ddl/ISchemaEvent.java: ########## @@ -0,0 +1,5 @@ +package org.apache.iotdb.pipe.api.event.ddl; Review Comment: Besides, exposing this to api may not be a good idea... Consider common packet? ########## iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/PipeConnector.java: ########## @@ -139,6 +140,17 @@ default void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exceptio } } + /** + * This method is used to transfer schema change events. + * + * @param schemaEvent SchemaEvent to be transferred + * @throws PipeConnectionException if the connection is broken + * @throws Exception the user can throw errors if necessary + */ + default void transfer(ISchemaEvent schemaEvent) throws Exception { Review Comment: Maybe not directly expose it to api, because temporarily users may not write plugins for this ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java: ########## @@ -100,16 +102,37 @@ public boolean isLocked() { /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */ public TSStatus createPipe(TCreatePipeReq req) { - return configManager.getProcedureManager().createPipe(req); + final TSStatus status = configManager.getProcedureManager().createPipe(req); Review Comment: Maybe move the consensus plan execution logic from PipeTaskInfo to PipeTaskCoordinator, and let the PipeTaskCoordinator manage the subtasks. ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeConfigSubtask.java: ########## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.task.subtask; + +import org.apache.iotdb.commons.pipe.execution.scheduler.PipeConfigSubtaskScheduler; +import org.apache.iotdb.commons.pipe.plugin.builtin.connector.schema.IoTDBSchemaConnector; +import org.apache.iotdb.commons.pipe.task.DecoratingLock; +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class PipeConfigSubtask Review Comment: Maybe move it into the "configNode" package? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
