VGalaxies commented on code in PR #13932:
URL: https://github.com/apache/iotdb/pull/13932#discussion_r1820612763
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java:
##########
@@ -1906,21 +1910,101 @@ public SettableFuture<ConfigTaskResult>
alterPipe(AlterPipeStatement alterPipeSt
return future;
}
- // Validate pipe plugin before alteration - only validate replace mode
+ // Validate pipe existence
+ final PipeMeta pipeMetaFromCoordinator;
+ try (final ConfigNodeClient configNodeClient =
+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ final TGetAllPipeInfoResp getAllPipeInfoResp =
configNodeClient.getAllPipeInfo();
+ if (getAllPipeInfoResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new StartupException("Failed to get pipe task meta from config
node.");
+ }
+
+ pipeMetaFromCoordinator =
+ getAllPipeInfoResp.getAllPipeInfo().stream()
+ .map(PipeMeta::deserialize)
+ .filter(
+ pipeMeta ->
+ pipeMeta
+ .getStaticMeta()
+ .getPipeName()
+ .equals(alterPipeStatement.getPipeName()))
+ .findFirst()
+ .orElse(null);
+ if (pipeMetaFromCoordinator == null) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to alter pipe %s, pipe not found in system.",
+ alterPipeStatement.getPipeName());
+ LOGGER.warn(exceptionMessage);
+ future.setException(
+ new IoTDBException(exceptionMessage,
TSStatusCode.PIPE_ERROR.getStatusCode()));
+ return future;
+ }
+ } catch (Exception e) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to alter pipe %s, because %s",
+ alterPipeStatement.getPipeName(), e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ future.setException(
+ new IoTDBException(exceptionMessage,
TSStatusCode.PIPE_ERROR.getStatusCode()));
+ return future;
+ }
+
+ // Construct temporary pipe static meta for validation
final String pipeName = alterPipeStatement.getPipeName();
try {
- if (!alterPipeStatement.getExtractorAttributes().isEmpty()
- && alterPipeStatement.isReplaceAllExtractorAttributes()) {
-
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
- }
- if (!alterPipeStatement.getProcessorAttributes().isEmpty()
- && alterPipeStatement.isReplaceAllProcessorAttributes()) {
-
PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
- }
- if (!alterPipeStatement.getConnectorAttributes().isEmpty()
- && alterPipeStatement.isReplaceAllConnectorAttributes()) {
- PipeDataNodeAgent.plugin()
- .validateConnector(pipeName,
alterPipeStatement.getConnectorAttributes());
+ if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
+ if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
+
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
+ }
+ } else {
+ if (!alterPipeStatement.getExtractorAttributes().isEmpty()) {
+ pipeMetaFromCoordinator
+ .getStaticMeta()
+ .getExtractorParameters()
+ .addOrReplaceEquivalentAttributes(
+ new
PipeParameters(alterPipeStatement.getExtractorAttributes()));
+ PipeDataNodeAgent.plugin()
+ .validateExtractor(
+
pipeMetaFromCoordinator.getStaticMeta().getExtractorParameters().getAttribute());
+ }
+ }
Review Comment:
Maybe the order of if can be changed, first check
`!alterPipeStatement.getExtractorAttributes().isEmpty()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]