Copilot commented on code in PR #16672: URL: https://github.com/apache/iotdb/pull/16672#discussion_r2498470035
########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.commons.utils.SerializeUtils; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeAlterEncodingCompressorPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan; +import org.apache.iotdb.confignode.manager.ClusterManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.AlterEncodingCompressorState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.invalidateCache; +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.preparePatternTreeBytesData; + +public class AlterEncodingCompressorProcedure + extends StateMachineProcedure<ConfigNodeProcedureEnv, AlterEncodingCompressorState> { + private static final Logger LOGGER = LoggerFactory.getLogger(AlterEncodingCompressorState.class); Review Comment: Logger is created with the wrong class. The logger should be created with `AlterEncodingCompressorProcedure.class` instead of `AlterEncodingCompressorState.class`. This will result in incorrect logger names in log output, making debugging more difficult. Change to: `LoggerFactory.getLogger(AlterEncodingCompressorProcedure.class)` ```suggestion private static final Logger LOGGER = LoggerFactory.getLogger(AlterEncodingCompressorProcedure.class); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java: ########## @@ -587,6 +588,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { "You should never see ContinuousSameSearchIndexSeparatorNode in this function, because ContinuousSameSearchIndexSeparatorNode should never be used in network transmission."); case 98: return LastQueryScanNode.deserialize(buffer); + case 99: + return AlignedSeriesAggregationScanNode.deserialize(buffer); Review Comment: Incorrect deserialization for ALTER_ENCODING_COMPRESSOR plan node type. Case 99 should deserialize `AlterEncodingCompressorNode` but instead deserializes `AlignedSeriesAggregationScanNode`. This will cause runtime errors when deserializing ALTER_ENCODING_COMPRESSOR operations. Change to: `return AlterEncodingCompressorNode.deserialize(buffer);` ########## iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternTree.java: ########## @@ -97,9 +97,17 @@ public void appendFullPath(PartialPath devicePath, String measurement) { appendBranchWithoutPrune(root, pathNodes, 0); } + public void appendPathPattern(final PartialPath pathPattern) { + appendPathPattern(pathPattern, false); + } + /** Add a pathPattern (may contain wildcards) to pathPatternList. */ - public void appendPathPattern(PartialPath pathPattern) { + public void appendPathPattern(final PartialPath pathPattern, final boolean isReload) { if (useWildcard) { + // This does not guarantee multi-thread safety + if (isReload && (pathPatternList == null || pathPatternList.isEmpty())) { + pathPatternList = getAllPathPatterns(); + } boolean isExist = false; for (PartialPath path : pathPatternList) { Review Comment: Variable [pathPatternList](1) may be null at this access as suggested by [this](2) null guard. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.commons.utils.SerializeUtils; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeAlterEncodingCompressorPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan; +import org.apache.iotdb.confignode.manager.ClusterManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.AlterEncodingCompressorState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.invalidateCache; +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.preparePatternTreeBytesData; + +public class AlterEncodingCompressorProcedure + extends StateMachineProcedure<ConfigNodeProcedureEnv, AlterEncodingCompressorState> { + private static final Logger LOGGER = LoggerFactory.getLogger(AlterEncodingCompressorState.class); + private String queryId; + private PathPatternTree patternTree; + private boolean ifExists; + private byte encoding; + private byte compressor; + private boolean mayAlterAudit; + + private transient ByteBuffer patternTreeBytes; + private transient String requestMessage; + + public AlterEncodingCompressorProcedure(final boolean isGeneratedByPipe) { + super(isGeneratedByPipe); + } + + public AlterEncodingCompressorProcedure( + final boolean isGeneratedByPipe, + final String queryId, + final PathPatternTree pathPatternTree, + final boolean ifExists, + final byte encoding, + final byte compressor, + final boolean mayAlterAudit) { + super(isGeneratedByPipe); + this.queryId = queryId; + setPatternTree(pathPatternTree); + this.ifExists = ifExists; + this.encoding = encoding; + this.compressor = compressor; + this.mayAlterAudit = mayAlterAudit; + } + + public String getQueryId() { + return queryId; + } + + @TestOnly + public PathPatternTree getPatternTree() { + return patternTree; + } + + public void setPatternTree(final PathPatternTree patternTree) { + this.patternTree = patternTree; + requestMessage = patternTree.getAllPathPatterns().toString(); + patternTreeBytes = preparePatternTreeBytesData(patternTree); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final AlterEncodingCompressorState state) + throws InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case ALTER_SCHEMA_REGION: + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Alter encoding {} & compressor {} in schema region for timeSeries {}", + SerializeUtils.deserializeEncodingNullable(encoding), + SerializeUtils.deserializeCompressorNullable(compressor), + requestMessage); + } + alterEncodingCompressorInSchemaRegion(env); + break; + case CLEAR_CACHE: + LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + collectPayload4Pipe(env); + return Flow.NO_MORE_STATE; + default: + setFailure(new ProcedureException("Unrecognized state " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "AlterEncodingCompressor-[{}] costs {}ms", + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void alterEncodingCompressorInSchemaRegion(final ConfigNodeProcedureEnv env) { + final Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup = + env.getConfigManager().getRelatedSchemaRegionGroup(patternTree, mayAlterAudit); + + if (relatedSchemaRegionGroup.isEmpty()) { + if (!ifExists) { + setFailure( + new ProcedureException( + new PathNotExistException( + patternTree.getAllPathPatterns().stream() + .map(PartialPath::getFullPath) + .collect(Collectors.toList()), + false))); + } + return; + } + + final DataNodeTSStatusTaskExecutor<TAlterEncodingCompressorReq> alterEncodingCompressorTask = + new DataNodeTSStatusTaskExecutor<TAlterEncodingCompressorReq>( + env, + env.getConfigManager().getRelatedSchemaRegionGroup(patternTree, mayAlterAudit), + false, + CnToDnAsyncRequestType.ALTER_ENCODING_COMPRESSOR, + ((dataNodeLocation, consensusGroupIdList) -> + new TAlterEncodingCompressorReq(consensusGroupIdList, patternTreeBytes, ifExists) + .setCompressor(compressor) + .setEncoding(encoding))) { + + private final Map<TDataNodeLocation, TSStatus> failureMap = new HashMap<>(); + + @Override + protected List<TConsensusGroupId> processResponseOfOneDataNode( + final TDataNodeLocation dataNodeLocation, + final List<TConsensusGroupId> consensusGroupIdList, + final TSStatus response) { + final List<TConsensusGroupId> failedRegionList = new ArrayList<>(); + if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + failureMap.remove(dataNodeLocation); + return failedRegionList; + } + + if (response.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + final List<TSStatus> subStatus = response.getSubStatus(); + for (int i = 0; i < subStatus.size(); i++) { + if (subStatus.get(i).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && !(subStatus.get(i).getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode() + && ifExists)) { + failedRegionList.add(consensusGroupIdList.get(i)); + } + } + } else if (!(response.getCode() == TSStatusCode.PATH_NOT_EXIST.getStatusCode() + && ifExists)) { + failedRegionList.addAll(consensusGroupIdList); + } + if (!failedRegionList.isEmpty()) { + failureMap.put(dataNodeLocation, response); + } else { + failureMap.remove(dataNodeLocation); + } + return failedRegionList; + } + + @Override + protected void onAllReplicasetFailure( + final TConsensusGroupId consensusGroupId, + final Set<TDataNodeLocation> dataNodeLocationSet) { + setFailure( + new ProcedureException( + new MetadataException( + String.format( + "Alter encoding compressor %s in schema regions failed. Failures: %s", + requestMessage, failureMap)))); + interruptTask(); + } + }; + alterEncodingCompressorTask.execute(); + setNextState(AlterEncodingCompressorState.CLEAR_CACHE); + } + + private void collectPayload4Pipe(final ConfigNodeProcedureEnv env) { + TSStatus result; + try { + result = + env.getConfigManager() + .getConsensusManager() + .write( + isGeneratedByPipe + ? new PipeEnrichedPlan( + new PipeAlterEncodingCompressorPlan( + patternTreeBytes, encoding, compressor, mayAlterAudit)) + : new PipeAlterEncodingCompressorPlan( + patternTreeBytes, encoding, compressor, mayAlterAudit)); + } catch (final ConsensusException e) { + LOGGER.warn(ClusterManager.CONSENSUS_WRITE_ERROR, e); + result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + result.setMessage(e.getMessage()); + } + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(result.getMessage()); + } + } + + @Override + protected void rollbackState( + final ConfigNodeProcedureEnv env, + final AlterEncodingCompressorState alterEncodingCompressorState) + throws IOException, InterruptedException, ProcedureException { + // Not supported now + } + + @Override + protected AlterEncodingCompressorState getState(final int stateId) { + return AlterEncodingCompressorState.values()[stateId]; + } + + @Override + protected int getStateId(final AlterEncodingCompressorState alterEncodingCompressorState) { + return alterEncodingCompressorState.ordinal(); + } + + @Override + protected AlterEncodingCompressorState getInitialState() { + return AlterEncodingCompressorState.ALTER_SCHEMA_REGION; + } + + @Override + public void serialize(final DataOutputStream stream) throws IOException { + stream.writeShort( + isGeneratedByPipe + ? ProcedureType.PIPE_ENRICHED_ALTER_ENCODING_COMPRESSOR_PROCEDURE.getTypeCode() + : ProcedureType.ALTER_ENCODING_COMPRESSOR_PROCEDURE.getTypeCode()); + super.serialize(stream); + ReadWriteIOUtils.write(queryId, stream); + patternTree.serialize(stream); + ReadWriteIOUtils.write(ifExists, stream); + ReadWriteIOUtils.write(encoding, stream); + ReadWriteIOUtils.write(compressor, stream); + ReadWriteIOUtils.write(mayAlterAudit, stream); + } + + @Override + public void deserialize(final ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + queryId = ReadWriteIOUtils.readString(byteBuffer); + setPatternTree(PathPatternTree.deserialize(byteBuffer)); + ifExists = ReadWriteIOUtils.readBoolean(byteBuffer); + encoding = ReadWriteIOUtils.readByte(byteBuffer); + compressor = ReadWriteIOUtils.readByte(byteBuffer); + mayAlterAudit = ReadWriteIOUtils.readBoolean(byteBuffer); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; Review Comment: Inconsistent formatting in equals method. Line 301 lacks braces for the if statement, while the rest of the codebase consistently uses braces. For consistency with the rest of the method and codebase, add braces: `if (this == o) { return true; }` ```suggestion if (this == o) { return true; } ``` ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/pipe/payload/PipeAlterEncodingCompressorPlan.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.consensus.request.write.pipe.payload; + +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; +import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; + +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import javax.annotation.Nonnull; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +public class PipeAlterEncodingCompressorPlan extends ConfigPhysicalPlan { Review Comment: Class PipeAlterEncodingCompressorPlan overrides [hashCode](1) but not equals. ########## iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java: ########## @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.schema; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; +import org.apache.iotdb.commons.utils.SerializeUtils; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeAlterEncodingCompressorPlan; +import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan; +import org.apache.iotdb.confignode.manager.ClusterManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.AlterEncodingCompressorState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq; +import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.invalidateCache; +import static org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure.preparePatternTreeBytesData; + +public class AlterEncodingCompressorProcedure + extends StateMachineProcedure<ConfigNodeProcedureEnv, AlterEncodingCompressorState> { + private static final Logger LOGGER = LoggerFactory.getLogger(AlterEncodingCompressorState.class); + private String queryId; + private PathPatternTree patternTree; + private boolean ifExists; + private byte encoding; + private byte compressor; + private boolean mayAlterAudit; + + private transient ByteBuffer patternTreeBytes; + private transient String requestMessage; + + public AlterEncodingCompressorProcedure(final boolean isGeneratedByPipe) { + super(isGeneratedByPipe); + } + + public AlterEncodingCompressorProcedure( + final boolean isGeneratedByPipe, + final String queryId, + final PathPatternTree pathPatternTree, + final boolean ifExists, + final byte encoding, + final byte compressor, + final boolean mayAlterAudit) { + super(isGeneratedByPipe); + this.queryId = queryId; + setPatternTree(pathPatternTree); + this.ifExists = ifExists; + this.encoding = encoding; + this.compressor = compressor; + this.mayAlterAudit = mayAlterAudit; + } + + public String getQueryId() { + return queryId; + } + + @TestOnly + public PathPatternTree getPatternTree() { + return patternTree; + } + + public void setPatternTree(final PathPatternTree patternTree) { + this.patternTree = patternTree; + requestMessage = patternTree.getAllPathPatterns().toString(); + patternTreeBytes = preparePatternTreeBytesData(patternTree); + } + + @Override + protected Flow executeFromState( + final ConfigNodeProcedureEnv env, final AlterEncodingCompressorState state) + throws InterruptedException { + final long startTime = System.currentTimeMillis(); + try { + switch (state) { + case ALTER_SCHEMA_REGION: + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Alter encoding {} & compressor {} in schema region for timeSeries {}", + SerializeUtils.deserializeEncodingNullable(encoding), + SerializeUtils.deserializeCompressorNullable(compressor), + requestMessage); + } + alterEncodingCompressorInSchemaRegion(env); + break; + case CLEAR_CACHE: + LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + collectPayload4Pipe(env); + return Flow.NO_MORE_STATE; + default: + setFailure(new ProcedureException("Unrecognized state " + state)); + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } finally { + LOGGER.info( + "AlterEncodingCompressor-[{}] costs {}ms", + state, + (System.currentTimeMillis() - startTime)); + } + } + + private void alterEncodingCompressorInSchemaRegion(final ConfigNodeProcedureEnv env) { + final Map<TConsensusGroupId, TRegionReplicaSet> relatedSchemaRegionGroup = + env.getConfigManager().getRelatedSchemaRegionGroup(patternTree, mayAlterAudit); + + if (relatedSchemaRegionGroup.isEmpty()) { + if (!ifExists) { + setFailure( + new ProcedureException( + new PathNotExistException( + patternTree.getAllPathPatterns().stream() + .map(PartialPath::getFullPath) + .collect(Collectors.toList()), + false))); + } + return; + } + + final DataNodeTSStatusTaskExecutor<TAlterEncodingCompressorReq> alterEncodingCompressorTask = + new DataNodeTSStatusTaskExecutor<TAlterEncodingCompressorReq>( + env, + env.getConfigManager().getRelatedSchemaRegionGroup(patternTree, mayAlterAudit), + false, + CnToDnAsyncRequestType.ALTER_ENCODING_COMPRESSOR, + ((dataNodeLocation, consensusGroupIdList) -> Review Comment: The parameter 'dataNodeLocation' is never used. ```suggestion ((_, consensusGroupIdList) -> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
