Copilot commented on code in PR #17303:
URL: https://github.com/apache/iotdb/pull/17303#discussion_r2944121586


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/PipeConsensus.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import 
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+  private static final String REPLICATION_AIR_GUARDIAN_TASK_ID = 
"replication_air_guardian";
+  private static final String CLASS_NAME = 
AirReplication.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplication.class);

Review Comment:
   `PipeConsensus.java` declares `public class AirReplication`, which will not 
compile because the public class name must match the filename. Rename the class 
to `PipeConsensus` or rename the file to `AirReplication.java` (and remove the 
duplicate `AirReplication.java` added in this PR).



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
+
+public class AirReplicationRPCService extends ThriftService implements 
AirReplicationRPCServiceMBean {
+
+  private final TEndPoint thisNode;
+  private final AirReplicationConfig config;
+  private AirReplicationRPCServiceProcessor airReplicationRPCServiceProcessor;
+
+  public AirReplicationRPCService(TEndPoint thisNode, AirReplicationConfig 
config) {
+    this.thisNode = thisNode;
+    this.config = config;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.AIR_REPLICATION_SERVICE;
+  }

Review Comment:
   `getID()` returns `ServiceType.AIR_REPLICATION_SERVICE`, but `ServiceType` 
currently has `IOT_CONSENSUS_SERVICE` and `PIPE_CONSENSUS_SERVICE` only (no 
`AIR_REPLICATION_SERVICE`). This will not compile unless `ServiceType` is 
extended, or this service reuses an existing `ServiceType` value.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationSyncLagManager.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationSink;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This class is used to aggregate the write progress of all Connectors to 
calculate the minimum
+ * synchronization progress of all follower copies, thereby calculating 
syncLag.
+ *
+ * <p>Note: every consensusGroup/dataRegion has and only has 1 instance of 
this class.
+ */
+public class AirReplicationSyncLagManager {
+  long syncLag = Long.MIN_VALUE;
+  ReentrantLock lock = new ReentrantLock();
+  Map<AirReplicationName, AirReplicationSink> airReplication2ConnectorMap = 
new ConcurrentHashMap<>();
+
+  /**
+   * pinnedCommitIndex - currentReplicateProgress. If res <= 0, indicating 
that replication is
+   * finished.
+   */
+  public long getSyncLagForRegionMigration(
+      AirReplicationName airReplicationName, long pinnedCommitIndex) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(
+            airReplicationSink ->
+                Math.max(pinnedCommitIndex - 
airReplicationSink.getFollowerApplyProgress(), 0L))
+        .orElse(0L);
+  }
+
+  /**
+   * userWriteProgress - currentReplicateProgress. If res <= 0, indicating 
that replication is
+   * finished.
+   */
+  public long getSyncLagForSpecificAirReplication(AirReplicationName 
airReplicationName) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(
+            airReplicationSink -> {
+              long userWriteProgress = 
airReplicationSink.getLeaderReplicateProgress();
+              long replicateProgress = 
airReplicationSink.getFollowerApplyProgress();
+              return Math.max(userWriteProgress - replicateProgress, 0L);
+            })
+        .orElse(0L);
+  }
+
+  public long getCurrentLeaderReplicateIndex(AirReplicationName 
airReplicationName) {
+    return 
Optional.ofNullable(airReplication2ConnectorMap.get(airReplicationName))
+        .map(AirReplicationSink::getLeaderReplicateProgress)
+        .orElse(0L);
+  }
+
+  public void addAirReplicationConnector(
+      AirReplicationName airReplicationName, AirReplicationSink 
airReplicationSink) {
+    lock.lock();
+    try {
+      airReplication2ConnectorMap.put(airReplicationName, airReplicationSink);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void removeAirReplicationConnector(AirReplicationName 
airReplicationName) {
+    lock.lock();
+    try {
+      airReplication2ConnectorMap.remove(airReplicationName);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * SyncLag represents the biggest difference between the current replica 
users' write progress and
+   * the synchronization progress of all other replicas. The semantics is how 
much data the leader
+   * has left to synchronize.
+   */
+  public long calculateSyncLag() {
+    lock.lock();
+    try {
+      // if there isn't a air replication task, the syncLag is 0
+      if (airReplication2ConnectorMap.isEmpty()) {
+        return 0;
+      }
+      // else we find the biggest gap between leader and replicas in all air 
replication task.
+      syncLag = Long.MIN_VALUE;
+      airReplication2ConnectorMap
+          .keySet()
+          .forEach(
+              airReplicationName ->
+                  syncLag =
+                      Math.max(syncLag, 
getSyncLagForSpecificAirReplication(airReplicationName)));
+      return syncLag;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void clear() {
+    this.airReplication2ConnectorMap.clear();
+  }
+
+  private AirReplicationSyncLagManager() {
+    // do nothing
+  }
+
+  private static class AirReplicationSyncLagManagerHolder {
+    private static Map<String, AirReplicationSyncLagManager> 
REPLICATION_GROUP_ID_2_INSTANCE_MAP;
+
+    private AirReplicationSyncLagManagerHolder() {
+      // empty constructor
+    }
+
+    private static void build() {
+      if (REPLICATION_GROUP_ID_2_INSTANCE_MAP == null) {
+        REPLICATION_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
+      }
+    }
+  }
+
+  public static AirReplicationSyncLagManager getInstance(String groupId) {
+    return 
AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
+        groupId, key -> new AirReplicationSyncLagManager());
+  }

Review Comment:
   `AirReplicationSyncLagManagerHolder.REPLICATION_GROUP_ID_2_INSTANCE_MAP` is 
never initialized before `getInstance()` calls `computeIfAbsent(...)`, which 
will throw an NPE unless `AirReplicationSyncLagManager.build()` is called 
first. I couldn't find any call sites for 
`AirReplicationSyncLagManager.build()` in the current codebase; consider 
initializing the map eagerly (static final) or calling `build()` inside 
`getInstance()`/`release()` to make it safe.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCService.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.service.ThriftService;
+import org.apache.iotdb.commons.service.ThriftServiceThread;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.rpc.ZeroCopyRpcTransportFactory;
+
+public class AirReplicationRPCService extends ThriftService implements 
AirReplicationRPCServiceMBean {
+
+  private final TEndPoint thisNode;
+  private final AirReplicationConfig config;
+  private AirReplicationRPCServiceProcessor airReplicationRPCServiceProcessor;
+
+  public AirReplicationRPCService(TEndPoint thisNode, AirReplicationConfig 
config) {
+    this.thisNode = thisNode;
+    this.config = config;
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.AIR_REPLICATION_SERVICE;
+  }
+
+  @Override
+  public void initSyncedServiceImpl(Object airReplicationRPCServiceProcessor) {
+    this.airReplicationRPCServiceProcessor =
+        (AirReplicationRPCServiceProcessor) airReplicationRPCServiceProcessor;
+    super.initSyncedServiceImpl(this.airReplicationRPCServiceProcessor);
+  }
+
+  @Override
+  public void initTProcessor() {
+    processor = new 
AirReplicationIService.Processor<>(airReplicationRPCServiceProcessor);
+  }
+
+  @Override
+  public void initThriftServiceThread() throws IllegalAccessException {
+    try {
+      thriftServiceThread =
+          config.getRpc().isEnableSSL()
+              ? new ThriftServiceThread(
+                  processor,
+                  getID().getName(),
+                  ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+                  getBindIP(),
+                  getBindPort(),
+                  config.getRpc().getRpcMaxConcurrentClientNum(),
+                  config.getRpc().getThriftServerAwaitTimeForStopService(),
+                  new 
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+                  config.getRpc().isRpcThriftCompressionEnabled(),
+                  config.getRpc().getSslKeyStorePath(),
+                  config.getRpc().getSslKeyStorePassword(),
+                  config.getRpc().getSslTrustStorePath(),
+                  config.getRpc().getSslTrustStorePassword(),
+                  ZeroCopyRpcTransportFactory.INSTANCE)
+              : new ThriftServiceThread(
+                  processor,
+                  getID().getName(),
+                  ThreadName.AIR_REPLICATION_RPC_PROCESSOR.getName(),
+                  getBindIP(),
+                  getBindPort(),
+                  config.getRpc().getRpcMaxConcurrentClientNum(),
+                  config.getRpc().getThriftServerAwaitTimeForStopService(),
+                  new 
AirReplicationRPCServiceHandler(airReplicationRPCServiceProcessor),
+                  config.getRpc().isRpcThriftCompressionEnabled(),
+                  ZeroCopyRpcTransportFactory.INSTANCE);
+    } catch (RPCServiceException e) {
+      throw new IllegalAccessException(e.getMessage());
+    }
+    
thriftServiceThread.setName(ThreadName.AIR_REPLICATION_RPC_SERVICE.getName());
+  }

Review Comment:
   `ThreadName.AIR_REPLICATION_RPC_PROCESSOR` and 
`ThreadName.AIR_REPLICATION_RPC_SERVICE` don't exist in 
`org.apache.iotdb.commons.concurrent.ThreadName` (no AIR_REPLICATION entries). 
This will fail compilation; please add the enum constants or reuse existing 
thread names.



##########
iotdb-core/ainode/standalone_finetune/model/model_storage.py:
##########
@@ -0,0 +1,91 @@
+"""Minimal model registry for standalone: register_finetuned_model, 
complete_finetune, fail_finetune."""
+import logging
+import os
+import shutil
+from typing import Dict, Optional
+
+from standalone_finetune.config import get_models_dir
+from standalone_finetune.model.model_constants import ModelCategory, 
ModelStates
+from standalone_finetune.model.model_info import 
BUILTIN_HF_TRANSFORMERS_MODEL_MAP, ModelInfo
+
+logger = logging.getLogger(__name__)
+
+
+def _ensure_init(path: str) -> None:
+    os.makedirs(path, exist_ok=True)
+    init_file = os.path.join(path, "__init__.py")
+    if not os.path.exists(init_file):
+        with open(init_file, "w"):
+            pass
+
+
+class ModelStorage:
+    def __init__(self):
+        self._models_dir = get_models_dir()
+        self._finetuned: Dict[str, ModelInfo] = {}
+        self._init_dirs()
+
+    def _init_dirs(self):
+        for cat in ModelCategory:
+            p = os.path.join(self._models_dir, cat.value)
+            _ensure_init(p)
+
+    def get_model_info(self, model_id: str, category: Optional[ModelCategory] 
= None) -> Optional[ModelInfo]:
+        if category == ModelCategory.FINE_TUNED:
+            return self._finetuned.get(model_id)
+        if model_id in BUILTIN_HF_TRANSFORMERS_MODEL_MAP:
+            return BUILTIN_HF_TRANSFORMERS_MODEL_MAP[model_id]
+        for cat in (ModelCategory.USER_DEFINED, ModelCategory.FINE_TUNED):
+            dir_path = os.path.join(self._models_dir, cat.value, model_id)
+            if os.path.isdir(dir_path):
+                info = ModelInfo(
+                    model_id=model_id,
+                    category=cat,
+                    state=ModelStates.ACTIVE,
+                    model_type="",
+                    base_model_id=model_id if cat == ModelCategory.FINE_TUNED 
else None,
+                )
+                if model_id in BUILTIN_HF_TRANSFORMERS_MODEL_MAP:
+                    base = BUILTIN_HF_TRANSFORMERS_MODEL_MAP.get(model_id)
+                    if base:
+                        info.model_type = base.model_type
+                        info.auto_map = base.auto_map
+                        info.pipeline_cls = base.pipeline_cls
+                return info
+        return None
+
+    def register_finetuned_model(self, model_id: str, base_model_id: str) -> 
ModelInfo:
+        if model_id in self._finetuned:
+            raise ValueError(f"Model already registered: {model_id}")
+        base = self.get_model_info(base_model_id)
+        if base is None:
+            raise ValueError(f"Base model not found: {base_model_id}")
+        model_dir = os.path.join(self._models_dir, 
ModelCategory.FINE_TUNED.value, model_id)
+        _ensure_init(model_dir)
+        sft_info = base.copy(
+            model_id=model_id,
+            category=ModelCategory.FINE_TUNED,
+            state=ModelStates.TRAINING,
+            base_model_id=base_model_id,
+        )
+        self._finetuned[model_id] = sft_info
+        logger.info(f"Registered fine-tuned model {model_id} based on 
{base_model_id}")
+        return base

Review Comment:
   `register_finetuned_model` constructs `sft_info` for the new fine-tuned 
model, but returns `base` instead. Callers (e.g., CLI) will receive the base 
model info, not the newly registered fine-tuned model, which breaks downstream 
logic (e.g., state tracking and metadata). Return `sft_info` here.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+  private final AirReplicationServerImpl impl;
+  private final AirReplicationSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "AirReplicationServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.AIR_REPLICATION_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }

Review Comment:
   `Metric.AIR_REPLICATION_MODE` and `Metric.AIR_REPLICATION` are referenced 
here, but `org.apache.iotdb.commons.service.metric.enums.Metric` does not 
define those enum values (only PIPE_* etc). This will not compile unless the 
Metric enum is extended, or these metrics are renamed to existing metric keys.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/AirReplication.java:
##########
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.async.AsyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncAirReplicationServiceClient;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import 
org.apache.iotdb.commons.consensus.iotv2.container.IoTV2GlobalComponentContainer;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.commons.service.RegisterManager;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationGuardian;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationManager;
+import org.apache.iotdb.consensus.air.airreplication.AirReplicationName;
+import org.apache.iotdb.consensus.air.service.AirReplicationRPCService;
+import 
org.apache.iotdb.consensus.air.service.AirReplicationRPCServiceProcessor;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+public class AirReplication implements IConsensus {
+  private static final String REPLICATION_AIR_GUARDIAN_TASK_ID = 
"replication_air_guardian";
+  private static final String CLASS_NAME = 
AirReplication.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AirReplication.class);

Review Comment:
   This PR adds a second `org.apache.iotdb.consensus.air.AirReplication` 
implementation in addition to `PipeConsensus.java` (which also defines 
`AirReplication`). Having two sources defining the same fully-qualified class 
will cause a duplicate class compilation error; please keep only one canonical 
implementation and delete/rename the other.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/service/AirReplicationRPCServiceProcessor.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.service;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
+import 
org.apache.iotdb.commons.utils.KillPoint.IoTConsensusInactivatePeerKillPoints;
+import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.config.AirReplicationConfig;
+import org.apache.iotdb.consensus.exception.ConsensusGroupModifyPeerException;
+import org.apache.iotdb.consensus.air.AirReplication;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.consensus.air.thrift.AirReplicationIService;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedReq;
+import org.apache.iotdb.consensus.air.thrift.TCheckAirReplicationCompletedResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToCreateAirReplicationResp;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationReq;
+import 
org.apache.iotdb.consensus.air.thrift.TNotifyPeerToDropAirReplicationResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationBatchTransferResp;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferReq;
+import org.apache.iotdb.consensus.air.thrift.TAirReplicationTransferResp;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
+import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import 
org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AirReplicationRPCServiceProcessor implements 
AirReplicationIService.Iface {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(AirReplicationRPCServiceProcessor.class);
+  private final AirReplication airReplication;
+
+  private final AirReplicationConfig.Air config;
+
+  public AirReplicationRPCServiceProcessor(
+      AirReplication airReplication, AirReplicationConfig.Air config) {
+    this.airReplication = airReplication;
+    this.config = config;
+  }
+
+  @Override
+  public TAirReplicationTransferResp 
airReplicationTransfer(TAirReplicationTransferReq req) {
+    return config.getAirReplicationReceiver().receive(req);
+  }
+
+  // TODO: consider batch transfer
+  @Override
+  public TAirReplicationBatchTransferResp airReplicationBatchTransfer(
+      TAirReplicationBatchTransferReq req) throws TException {
+    return new TAirReplicationBatchTransferResp();
+  }
+
+  @Override
+  public TSetActiveResp setActive(TSetActiveReq req) throws TException {
+    if (req.isForDeletionPurpose && !req.isActive) {
+      
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.BEFORE_INACTIVATE);
+    }
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for set active request 
%s", groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TSetActiveResp(status);
+    }
+    impl.setActive(req.isActive);
+    if (req.isActive) {
+      KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_DONE);
+    }
+    if (req.isForDeletionPurpose && !req.isActive) {
+      
KillPoint.setKillPoint(IoTConsensusInactivatePeerKillPoints.AFTER_INACTIVATE);
+    }
+    return new TSetActiveResp(RpcUtils.SUCCESS_STATUS);
+  }
+
+  @Override
+  public TNotifyPeerToCreateConsensusPipeResp notifyPeerToCreateConsensusPipe(
+      TNotifyPeerToCreateConsensusPipeReq req) throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.targetPeerConsensusGroupId);
+    AirReplicationServerImpl impl = airReplication.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for create consensus pipe 
request %s", groupId, req);
+      LOGGER.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      return new TNotifyPeerToCreateConsensusPipeResp(status);

Review Comment:
   This processor implements `AirReplicationIService.Iface`, but the imports 
and method set are inconsistent: it imports 
`TNotifyPeerToCreateAirReplication*` / `TNotifyPeerToDropAirReplication*` yet 
defines `notifyPeerToCreateConsensusPipe` / `notifyPeerToDropConsensusPipe` and 
related `T*ConsensusPipe*` request/response types that are not imported here. 
As-is, this file will not compile and likely doesn't match the generated Thrift 
interface; please align method names and Thrift types with 
`AirReplicationIService`.



##########
iotdb-core/ainode/standalone_finetune/data_provider/datasets/iotdb_tree_dataset.py:
##########
@@ -0,0 +1,177 @@
+import logging
+from typing import Dict, List, Optional
+
+import numpy as np
+import torch
+
+from standalone_finetune.data_provider.datasets.base_dataset import (
+    BasicTimeSeriesDataset,
+)
+from standalone_finetune.data_provider.processor.data_scaler import ScalerType
+
+logger = logging.getLogger(__name__)
+
+
+def _get_numeric_value(field):
+    """Extract numeric value from an IoTDB Field; return None for null / 
non-numeric."""
+    if field is None:
+        return None
+    try:
+        from iotdb.utils.IoTDBConstants import TSDataType
+
+        dt = field.get_data_type()
+        if dt is None:
+            return None
+        if dt == TSDataType.INT32:
+            return float(field.get_int_value())
+        if dt == TSDataType.INT64:
+            return float(field.get_long_value())
+        if dt == TSDataType.FLOAT:
+            return float(field.get_float_value())
+        if dt == TSDataType.DOUBLE:
+            return float(field.get_double_value())
+    except Exception:
+        pass
+    return None
+
+
+class IoTDBTreeDataset(BasicTimeSeriesDataset):
+    """
+    Dataset that executes a SQL on IoTDB (tree model) via ``Session``,
+    flattens all ``field0`` values from every device into one single series,
+    and applies a sliding window — same philosophy as ``TsFileTreeDataset``.
+    """
+
+    def __init__(
+        self,
+        sql: str,
+        iotdb_ip: str = "172.29.224.62",
+        iotdb_port: int = 6667,
+        iotdb_username: str = "root",
+        iotdb_password: str = "root",
+        train_ratio: float = 0.7,
+        seq_len: int = 2880,
+        input_token_len: int = 16,
+        output_token_lens: List[int] = None,
+        window_step: int = 1,
+        scale: bool = False,
+        scaler_type: ScalerType = "standard",
+        use_rate: float = 1.0,
+        offset_rate: float = 0.0,
+    ):
+        if output_token_lens is None:
+            output_token_lens = [96]
+        super().__init__(
+            seq_len,
+            input_token_len,
+            output_token_lens,
+            window_step,
+            scale,
+            scaler_type,
+            use_rate,
+            offset_rate,
+        )
+
+        self.sql = sql
+        self.train_ratio = train_ratio
+        self._data: Optional[np.ndarray] = None
+        self.total_windows: int = 0
+        self._window_offset: int = 0
+
+        self._session = self._open_session(
+            iotdb_ip, iotdb_port, iotdb_username, iotdb_password
+        )
+        self._init_dataset()
+
+    @staticmethod
+    def _open_session(ip: str, port: int, username: str, password: str):
+        from iotdb.Session import Session
+
+        session = Session.init_from_node_urls(
+            node_urls=[f"{ip}:{port}"],
+            user=username,
+            password=password,
+        )
+        session.open(False)
+        return session
+
+    def _fetch_data(self) -> np.ndarray:
+        """Execute SQL, flatten all numeric field0 values into one series."""
+        values: List[float] = []
+
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client11.*;")
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client12.*;")
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client13.*;")
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client14.*;")
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client15.*;")
+        self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client116.*;")
+        # self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client117.*;")
+        # self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client118.*;")
+        # self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client119.*;")
+        # self._session.execute_query_statement("SELECT field0 FROM 
root.consensus.client110.*;")
+

Review Comment:
   `IoTDBTreeDataset._fetch_data()` executes several hard-coded queries against 
`root.consensus.client*` before running the user-provided `self.sql`. These 
statements ignore the `sql` parameter, introduce unexpected side effects/load, 
and their results are unused. Please remove these hard-coded queries and only 
execute the requested SQL (or make the extra queries optional and configurable).
   



##########
iotdb-core/ainode/standalone_finetune/data_provider/datasets/csv_dataset.py:
##########
@@ -0,0 +1,195 @@
+import os
+from typing import Dict, List, Optional
+
+import numpy as np
+import pandas as pd
+import torch
+
+from standalone_finetune.data_provider.datasets.base_dataset import (
+    BasicTimeSeriesDataset,
+)
+from standalone_finetune.data_provider.processor.data_scaler import ScalerType
+
+
+class CSVDataset(BasicTimeSeriesDataset):
+    """
+    Dataset for loading time series data from CSV files.
+
+    Args:
+        variables: Ordered list of column names to select from the CSV.
+            Convention: the **last** entry is the prediction target; all
+            preceding entries are covariates.  If None, all columns are used.
+    """
+
+    def __init__(
+        self,
+        data_path: str,
+        seq_len: int,
+        input_token_len: int,
+        output_token_lens: List[int],
+        variables: Optional[List[str]] = None,
+        train_ratio: float = 0.7,
+        window_step: int = 1,
+        scale: bool = False,
+        scaler_type: ScalerType = "standard",
+        use_rate: float = 1.0,
+        offset_rate: float = 0.0,
+        multivariate: bool = False,
+    ):
+        super().__init__(
+            seq_len,
+            input_token_len,
+            output_token_lens,
+            window_step,
+            scale,
+            scaler_type,
+            use_rate,
+            offset_rate,
+        )
+
+        self.data_path = data_path
+        self.variables = variables
+        self.multivariate = multivariate
+        self.train_ratio = train_ratio
+
+        self._load_data()
+
+    def _load_data(self):
+        if os.path.isdir(self.data_path):
+            csv_files = sorted(
+                [
+                    os.path.join(self.data_path, f)
+                    for f in os.listdir(self.data_path)
+                    if f.endswith(".csv")
+                ]
+            )
+            if not csv_files:
+                raise ValueError(f"No CSV files found in {self.data_path}")
+            df_list = [pd.read_csv(f) for f in csv_files]
+            df_raw = pd.concat(df_list, ignore_index=True)
+        else:
+            df_raw = pd.read_csv(self.data_path)
+
+        if self.variables is not None:
+            df_raw = df_raw[self.variables]
+
+        if isinstance(df_raw[df_raw.columns[0]][2], str):
+            data = df_raw[df_raw.columns[1:]].values

Review Comment:
   In `CSVDataset._load_data()`, `df_raw[df_raw.columns[0]][2]` will raise an 
`IndexError` when the CSV has fewer than 3 rows (and it relies on label-based 
indexing). Use a safer check such as `len(df_raw) > 2` and `df_raw.iloc[2, 0]` 
(or detect the time column by dtype) before slicing columns.
   



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+  private final AirReplicationServerImpl impl;
+  private final AirReplicationSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "AirReplicationServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.AIR_REPLICATION_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }
+
+  public void unbindGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_CONSENSUS_MODE.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.TYPE.toString(),
+        "replicateMode");
+  }
+
+  public void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.AIR_REPLICATION.toString(),
+        MetricLevel.IMPORTANT,
+        syncLagManager,
+        PipeConsensusSyncLagManager::calculateSyncLag,
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void unbindAutoGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.AIR_REPLICATION.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");
+  }
+
+  public void bindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "getStateMachineLock",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    userWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.STAGE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "userWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+    replicaWriteStateMachineTimer =
+        metricService.getOrCreateTimer(
+            Metric.PIPE_RECEIVE_EVENT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            Metric.AIR_REPLICATION.toString(),
+            Tag.TYPE.toString(),
+            "replicaWriteStateMachine",
+            Tag.REGION.toString(),
+            impl.getConsensusGroupId());
+  }
+
+  public void unbindStageTimer(AbstractMetricService metricService) {
+    getStateMachineLockTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    userWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+    replicaWriteStateMachineTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
+
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.AIR_REPLICATION.toString(),
+        Tag.TYPE.toString(),
+        "getStateMachineLock",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(
+        MetricType.TIMER,
+        Metric.STAGE.toString(),
+        Tag.NAME.toString(),
+        Metric.AIR_REPLICATION.toString(),
+        Tag.TYPE.toString(),
+        "writeStateMachine",
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId());
+    metricService.remove(

Review Comment:
   `unbindStageTimer` removes a timer with type tag `"writeStateMachine"`, but 
`bindStageTimer` registers `"userWriteStateMachine"` and 
`"replicaWriteStateMachine"`. This mismatch means at least one timer will not 
be unbound correctly.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+  private final AirReplicationServerImpl impl;
+  private final AirReplicationSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "AirReplicationServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.AIR_REPLICATION_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }
+
+  public void unbindGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_CONSENSUS_MODE.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.TYPE.toString(),
+        "replicateMode");
+  }
+
+  public void bindAutoGauge(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.AIR_REPLICATION.toString(),
+        MetricLevel.IMPORTANT,
+        syncLagManager,
+        PipeConsensusSyncLagManager::calculateSyncLag,
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.REGION.toString(),
+        impl.getConsensusGroupId(),
+        Tag.TYPE.toString(),
+        "syncLag");

Review Comment:
   `createAutoGauge(...)` is passed `syncLagManager` (an 
`AirReplicationSyncLagManager`), but the method reference is 
`PipeConsensusSyncLagManager::calculateSyncLag`. That method reference points 
to a different class and won’t type-check/compile. It should reference 
`AirReplicationSyncLagManager::calculateSyncLag` (or an equivalent method on 
`syncLagManager`).



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/airreplication/metric/AirReplicationServerMetrics.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.air.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.consensus.air.AirReplicationServerImpl;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Timer;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class AirReplicationServerMetrics implements IMetricSet {
+  private final AirReplicationServerImpl impl;
+  private final AirReplicationSyncLagManager syncLagManager;
+
+  private Timer getStateMachineLockTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer userWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+  private Timer replicaWriteStateMachineTimer = 
DoNothingMetricManager.DO_NOTHING_TIMER;
+
+  public AirReplicationServerMetrics(AirReplicationServerImpl impl) {
+    this.impl = impl;
+    this.syncLagManager = 
AirReplicationSyncLagManager.getInstance(impl.getConsensusGroupId());
+  }
+
+  private static final String IMPL = "AirReplicationServerImpl";
+
+  public void recordGetStateMachineLockTime(long costTimeInNanos) {
+    getStateMachineLockTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordUserWriteStateMachineTime(long costTimeInNanos) {
+    userWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  public void recordReplicaWriteStateMachineTime(long costTimeInNanos) {
+    replicaWriteStateMachineTimer.updateNanos(costTimeInNanos);
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    bindAutoGauge(metricService);
+    bindGauge(metricService);
+    bindStageTimer(metricService);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    unbindAutoGauge(metricService);
+    unbindGauge(metricService);
+    unbindStageTimer(metricService);
+
+    // release corresponding resource
+    AirReplicationSyncLagManager.release(impl.getConsensusGroupId());
+  }
+
+  public void bindGauge(AbstractMetricService metricService) {
+    metricService
+        .getOrCreateGauge(
+            Metric.AIR_REPLICATION_MODE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            IMPL,
+            Tag.TYPE.toString(),
+            "replicateMode")
+        .set(impl.getReplicateMode());
+  }
+
+  public void unbindGauge(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.GAUGE,
+        Metric.PIPE_CONSENSUS_MODE.toString(),
+        Tag.NAME.toString(),
+        IMPL,
+        Tag.TYPE.toString(),
+        "replicateMode");
+  }

Review Comment:
   `unbindGauge` removes `Metric.PIPE_CONSENSUS_MODE`, but `bindGauge` created 
a gauge under `Metric.AIR_REPLICATION_MODE`. Even if the metric keys are fixed, 
these two must match or the gauge will leak and never be removed.



##########
iotdb-core/ainode/standalone_finetune/finetune/task/task_info.py:
##########
@@ -0,0 +1,275 @@
+import os
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime
+from typing import Any, Dict, Optional
+
+from standalone_finetune.config import get_exp_dir, get_models_dir
+from standalone_finetune.model.model_constants import ModelCategory, 
ModelStates
+from standalone_finetune.model.model_info import ModelInfo
+from standalone_finetune.finetune.task.task_constants import (
+    CancelReason,
+    TaskError,
+    TaskProgress,
+    TaskStatus,
+)
+from standalone_finetune.hparams.data_args import DataArguments
+from standalone_finetune.hparams.finetune_args import FinetuneArguments
+from standalone_finetune.hparams.model_args import ModelArguments
+from standalone_finetune.hparams.training_args import TrainingArguments
+
+
+@dataclass
+class FinetuneTask:
+    task_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
+    base_model_info: Optional[ModelInfo] = field(default=None)
+    model_args: ModelArguments = field(default_factory=ModelArguments)
+    data_args: DataArguments = field(default_factory=DataArguments)
+    training_args: TrainingArguments = field(default_factory=TrainingArguments)
+    finetune_args: FinetuneArguments = field(default_factory=FinetuneArguments)
+
+    status: TaskStatus = field(default=TaskStatus.PENDING)
+    priority: str = field(default="normal")
+
+    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
+    started_at: Optional[str] = field(default=None)
+    completed_at: Optional[str] = field(default=None)
+
+    progress: TaskProgress = field(default_factory=TaskProgress)
+    error: Optional[TaskError] = field(default=None)
+
+    pid: Optional[int] = field(default=None)
+    cancel_reason: Optional[CancelReason] = field(default=None)
+
+    @property
+    def exp_dir(self) -> str:
+        return os.path.join(get_exp_dir(), self.task_id)
+
+    @property
+    def output_model_dir(self) -> str:
+        return os.path.join(
+            get_models_dir(), ModelCategory.FINE_TUNED.value, 
self.model_args.model_id
+        )
+
+    @classmethod
+    def create(
+        cls,
+        base_model_info: ModelInfo,
+        model_args: ModelArguments,
+        data_args: DataArguments,
+        training_args: TrainingArguments,
+        finetune_args: FinetuneArguments,
+    ) -> "FinetuneTask":
+        task_id = model_args.model_id or str(uuid.uuid4())[:8]
+        return cls(
+            task_id=task_id,
+            base_model_info=base_model_info,
+            model_args=model_args,
+            data_args=data_args,
+            training_args=training_args,
+            finetune_args=finetune_args,
+            priority=finetune_args.priority,
+        )
+
+    def start(self, pid: Optional[int] = None) -> None:
+        self.status = TaskStatus.RUNNING
+        self.started_at = datetime.now().isoformat()
+        self.pid = pid
+
+    def complete(self, success: bool = True, error_msg: str = "") -> None:
+        self.status = TaskStatus.SUCCEEDED if success else TaskStatus.FAILED
+        self.completed_at = datetime.now().isoformat()
+        if not success and error_msg:
+            self.error = TaskError(error_message=error_msg)
+
+    def cancel(self, reason: CancelReason = CancelReason.USER_REQUEST) -> None:
+        self.status = TaskStatus.CANCELED
+        self.completed_at = datetime.now().isoformat()
+        self.cancel_reason = reason
+
+    def update_progress(self, **kwargs) -> None:
+        for key, value in kwargs.items():
+            if hasattr(self.progress, key):
+                setattr(self.progress, key, value)
+
+    @property
+    def is_terminal(self) -> bool:
+        return self.status in (
+            TaskStatus.SUCCEEDED,
+            TaskStatus.FAILED,
+            TaskStatus.CANCELED,
+        )
+
+    @property
+    def elapsed_seconds(self) -> Optional[float]:
+        """Wall-clock seconds since task started (or total if finished)."""
+        if self.started_at is None:
+            return None
+        start = datetime.fromisoformat(self.started_at)
+        end = (
+            datetime.fromisoformat(self.completed_at)
+            if self.completed_at
+            else datetime.now()
+        )
+        return (end - start).total_seconds()
+
+    def to_summary(self) -> str:
+        elapsed = self.elapsed_seconds
+        elapsed_str = f"{elapsed:.0f}s" if elapsed is not None else "-"
+        loss_str = (
+            f"loss={self.progress.train_loss:.4f}"
+            if self.progress.train_loss > 0
+            else ""
+        )
+        return (
+            f"Task: {self.task_id} | Status: {self.status.value} | "
+            f"Type: {self.finetune_args.finetune_type} | "
+            f"Progress: 
{self.progress.current_epoch}/{self.progress.total_epochs} | "
+            f"Elapsed: {elapsed_str}" + (f" | {loss_str}" if loss_str else "")
+        )
+
+    def to_detail(self) -> str:
+        lines = [
+            f"Task ID: {self.task_id}",
+            f"Status: {self.status.value}",
+            f"Priority: {self.priority}",
+            f"Finetune Type: {self.finetune_args.finetune_type}",
+            f"Model Type: {self.model_args.model_type}",
+            f"Base Model: {self.model_args.base_model_id}",
+            f"Created: {self.created_at}",
+        ]
+        if self.started_at:
+            lines.append(f"Started: {self.started_at}")
+        if self.completed_at:
+            lines.append(f"Completed: {self.completed_at}")
+        elapsed = self.elapsed_seconds
+        if elapsed is not None:
+            lines.append(f"Elapsed: {elapsed:.1f}s")
+        if self.progress.total_epochs > 0:
+            lines.append(
+                f"Progress: Epoch 
{self.progress.current_epoch}/{self.progress.total_epochs}, "
+                f"Step 
{self.progress.current_step}/{self.progress.total_steps}"
+            )
+            if self.progress.train_loss > 0:
+                lines.append(f"Train Loss: {self.progress.train_loss:.6f}")
+            if self.progress.val_loss > 0:
+                lines.append(f"Val Loss: {self.progress.val_loss:.6f}")
+        if self.error:
+            lines.append(f"Error: {self.error.error_message}")
+        return "\n".join(lines)
+
+    def to_dict(self) -> Dict[str, Any]:
+        base_model_info_dict = None
+        if self.base_model_info is not None:
+            base_model_info_dict = {
+                "model_id": self.base_model_info.model_id,
+                "model_type": self.base_model_info.model_type,
+                "category": self.base_model_info.category.value,
+                "state": self.base_model_info.state.value,
+                "pipeline_cls": self.base_model_info.pipeline_cls,
+                "repo_id": self.base_model_info.repo_id,
+                "auto_map": self.base_model_info.auto_map,
+                "transformers_registered": 
self.base_model_info.transformers_registered,
+            }
+        return {
+            "task_id": self.task_id,
+            "base_model_info": base_model_info_dict,
+            "model_args": self.model_args.to_dict(),
+            "data_args": self.data_args.to_dict(),
+            "training_args": self.training_args.to_dict(),
+            "finetune_args": self.finetune_args.to_dict(),
+            "status": self.status.value,
+            "priority": self.priority,
+            "created_at": self.created_at,
+            "started_at": self.started_at,
+            "completed_at": self.completed_at,
+            "progress": {
+                "current_epoch": self.progress.current_epoch,
+                "total_epochs": self.progress.total_epochs,
+                "current_step": self.progress.current_step,
+                "total_steps": self.progress.total_steps,
+                "train_loss": self.progress.train_loss,
+                "val_loss": self.progress.val_loss,
+            },
+            "error": (
+                {
+                    "error_type": self.error.error_type,
+                    "error_message": self.error.error_message,
+                    "stack_trace": self.error.stack_trace,
+                }
+                if self.error
+                else None
+            ),
+            "pid": self.pid,
+            "cancel_reason": self.cancel_reason.value if self.cancel_reason 
else None,
+        }
+
+    @classmethod
+    def from_dict(cls, data: Dict[str, Any]) -> "FinetuneTask":
+        base_model_info = None
+        base_model_info_data = data.get("base_model_info")
+        if base_model_info_data:
+            base_model_info = ModelInfo(
+                model_id=base_model_info_data.get("model_id", ""),
+                model_type=base_model_info_data.get("model_type", ""),
+                category=ModelCategory(
+                    base_model_info_data.get("category", "FINETUNED")
+                ),
+                state=ModelStates(base_model_info_data.get("state", 
"INACTIVE")),
+                pipeline_cls=base_model_info_data.get("pipeline_cls", ""),

Review Comment:
   `FinetuneTask.from_dict()` uses defaults `"FINETUNED"` and `"INACTIVE"` when 
reconstructing `ModelCategory`/`ModelStates`, but the enums are defined as 
lowercase values (`"fine_tuned"`, `"inactive"`, etc.). These defaults will 
raise `ValueError` during deserialization. Use valid enum values (or 
map/normalize case) when providing defaults.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to