Caideyipi commented on code in PR #14253:
URL: https://github.com/apache/iotdb/pull/14253#discussion_r1899885865


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -487,8 +489,17 @@ public Iterable<TabletInsertionEvent> 
toTabletInsertionEvents(final long timeout
             "Pipe skipping temporary TsFile's parsing which shouldn't be 
transferred: {}", tsFile);
         return Collections.emptyList();
       }
+
       waitForResourceEnough4Parsing(timeoutMs);
-      return initEventParser().toTabletInsertionEvents();
+      Iterable<TabletInsertionEvent> events = 
initEventParser().toTabletInsertionEvents();
+      if (pipeName != null) {
+        events = new TabletInsertionEventIterable(events, this);
+        final PipeTsFileToTabletMetrics.PipeID pipeID =
+            PipeTsFileToTabletMetrics.PipeID.getPipeID(pipeName, creationTime);
+        PipeTsFileToTabletMetrics.getInstance().register(pipeID);

Review Comment:
   Seemingly this may cause a deregistered metric appear again... Same problem 
with the RemainingEventAndTimeOperator and @luoluoyuyu is handling this.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TabletInsertionEventIterable.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
+
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.PipeTsFileToTabletMetrics;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import java.util.Iterator;
+
+public class TabletInsertionEventIterable implements 
Iterable<TabletInsertionEvent> {
+  private final Iterable<TabletInsertionEvent> originalIterable;
+  private int count = 0;
+  private boolean isMarked = false;
+  private final PipeInsertionEvent sourceEvent;
+
+  public TabletInsertionEventIterable(
+      Iterable<TabletInsertionEvent> originalIterable, PipeInsertionEvent 
sourceEvent) {
+    this.originalIterable = originalIterable;
+    this.sourceEvent = sourceEvent;
+  }
+
+  @Override
+  public Iterator<TabletInsertionEvent> iterator() {
+    return new Iterator<TabletInsertionEvent>() {
+      private final Iterator<TabletInsertionEvent> originalIterator = 
originalIterable.iterator();
+
+      @Override
+      public boolean hasNext() {
+        boolean hasNext = originalIterator.hasNext();
+        if (!hasNext && !isMarked) {
+          isMarked = true;
+          if (sourceEvent != null) {
+            PipeTsFileToTabletMetrics.getInstance()
+                .markTabletCount(
+                    PipeTsFileToTabletMetrics.PipeID.getPipeID(
+                        sourceEvent.getPipeName(), 
sourceEvent.getCreationTime()),
+                    count);

Review Comment:
   Only needs to record "n"? Now it seems as if this will record 0 ~ n-1.....



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeTsFileToTabletMetrics.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTsFileToTabletMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final Map<String, Set<PipeID>> pipeIDMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tsFileSizeMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tabletCountMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      for (final PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        createMetrics(pipeID);
+      }
+    }
+  }
+
+  private void createMetrics(final PipeID pipeID) {
+    createRate(pipeID);
+  }
+
+  private void createRate(final PipeID pipeID) {
+    tsFileSizeMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+    tabletCountMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      deregister(pipeFullName);
+    }
+    if (!pipeIDMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipeTsFileToTablet metrics,  
pipeIDMap not empty");
+    }
+  }
+
+  private void removeMetrics(final PipeID pipeID) {
+    removeRate(pipeID);
+  }
+
+  private void removeRate(final PipeID pipeID) {
+    tsFileSizeMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+    tabletCountMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull final PipeID pipeID) {
+    pipeIDMap.putIfAbsent(
+        pipeID.getPipeFullName(), Collections.newSetFromMap(new 
ConcurrentHashMap<>()));
+    if (pipeIDMap.get(pipeID.getPipeFullName()).add(pipeID) && 
Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeFullName) {
+    if (!pipeIDMap.containsKey(pipeFullName)) {
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      for (PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        removeMetrics(pipeID);
+      }
+    }
+    pipeIDMap.remove(pipeFullName);
+  }
+
+  public void markTsFileSize(final PipeID pipeID, final long size) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = tsFileSizeMap.get(pipeID);
+    if (rate == null) {
+      LOGGER.info("Failed to mark pipe tsfile size, PipeID({}) does not 
exist", pipeID);
+      return;
+    }
+    rate.mark(size);

Review Comment:
   Is it OK to use "rate"? If we use "rate" we may concentrate on "TsFileSize / 
s" and "TabletCount / s", rather than "per tsFile"... Seemingly we only need to 
record the last value? If we want to know the average of the values, or any 
other staticstics, the Grafana may help us to do this.
   



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeTsFileToTabletMetrics.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTsFileToTabletMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final Map<String, Set<PipeID>> pipeIDMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tsFileSizeMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tabletCountMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      for (final PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        createMetrics(pipeID);
+      }
+    }
+  }
+
+  private void createMetrics(final PipeID pipeID) {
+    createRate(pipeID);
+  }
+
+  private void createRate(final PipeID pipeID) {
+    tsFileSizeMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+    tabletCountMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      deregister(pipeFullName);
+    }
+    if (!pipeIDMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipeTsFileToTablet metrics,  
pipeIDMap not empty");
+    }
+  }
+
+  private void removeMetrics(final PipeID pipeID) {
+    removeRate(pipeID);
+  }
+
+  private void removeRate(final PipeID pipeID) {
+    tsFileSizeMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+    tabletCountMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull final PipeID pipeID) {
+    pipeIDMap.putIfAbsent(
+        pipeID.getPipeFullName(), Collections.newSetFromMap(new 
ConcurrentHashMap<>()));
+    if (pipeIDMap.get(pipeID.getPipeFullName()).add(pipeID) && 
Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeFullName) {
+    if (!pipeIDMap.containsKey(pipeFullName)) {
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      for (PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        removeMetrics(pipeID);
+      }
+    }
+    pipeIDMap.remove(pipeFullName);
+  }
+
+  public void markTsFileSize(final PipeID pipeID, final long size) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = tsFileSizeMap.get(pipeID);
+    if (rate == null) {
+      LOGGER.info("Failed to mark pipe tsfile size, PipeID({}) does not 
exist", pipeID);
+      return;
+    }
+    rate.mark(size);
+  }
+
+  public void markTabletCount(final PipeID pipeID, final long count) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = tabletCountMap.get(pipeID);
+    if (rate == null) {
+      LOGGER.info("Failed to mark pipe tablet count, PipeID({}) does not 
exist", pipeID);
+      return;
+    }
+    rate.mark(count);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeTsFileToTabletMetricsHolder {
+
+    private static final PipeTsFileToTabletMetrics INSTANCE = new 
PipeTsFileToTabletMetrics();
+
+    private PipeTsFileToTabletMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeTsFileToTabletMetrics getInstance() {
+    return PipeTsFileToTabletMetrics.PipeTsFileToTabletMetricsHolder.INSTANCE;
+  }
+
+  private PipeTsFileToTabletMetrics() {
+    // empty constructor
+  }
+
+  public static class PipeID {
+    private final String pipeName;
+    private final String creationTime;
+    private final String callerName;
+
+    public PipeID(String pipeName, String creationTime, String callerName) {
+      this.pipeName = pipeName;
+      this.creationTime = creationTime;
+      this.callerName = callerName;
+    }
+
+    public static PipeID getPipeID(final String pipeName, final long 
creationTime) {
+      String callerClassName = 
Thread.currentThread().getStackTrace()[3].getClassName();
+      String callerMethodName = 
Thread.currentThread().getStackTrace()[3].getMethodName();
+      if (callerMethodName.equals("toTabletInsertionEvents")) {
+        callerClassName = 
Thread.currentThread().getStackTrace()[4].getClassName();
+        callerMethodName = 
Thread.currentThread().getStackTrace()[4].getMethodName();
+      }
+      return new PipeTsFileToTabletMetrics.PipeID(
+          pipeName,
+          String.valueOf(creationTime),
+          callerClassName.substring(callerClassName.lastIndexOf('.') + 1) + 
":" + callerMethodName);
+    }
+
+    public String getPipeName() {
+      return pipeName;
+    }
+
+    public String getCreationTime() {
+      return creationTime;
+    }
+
+    public String getCallerName() {
+      return callerName;
+    }
+
+    public String getPipeFullName() {
+      return pipeName + "_" + creationTime;

Review Comment:
   Consider PipeTaskAgent#getPipeNameWithCreationTime, where this string is 
cached and need not to be constructed each time.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -487,8 +489,17 @@ public Iterable<TabletInsertionEvent> 
toTabletInsertionEvents(final long timeout
             "Pipe skipping temporary TsFile's parsing which shouldn't be 
transferred: {}", tsFile);
         return Collections.emptyList();
       }
+
       waitForResourceEnough4Parsing(timeoutMs);
-      return initEventParser().toTabletInsertionEvents();
+      Iterable<TabletInsertionEvent> events = 
initEventParser().toTabletInsertionEvents();
+      if (pipeName != null) {
+        events = new TabletInsertionEventIterable(events, this);
+        final PipeTsFileToTabletMetrics.PipeID pipeID =
+            PipeTsFileToTabletMetrics.PipeID.getPipeID(pipeName, creationTime);
+        PipeTsFileToTabletMetrics.getInstance().register(pipeID);

Review Comment:
   See this PR: https://github.com/apache/iotdb/pull/14562/files



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeTsFileToTabletMetrics.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.metric;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Rate;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+import com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTsFileToTabletMetrics implements IMetricSet {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileToTabletMetrics.class);
+
+  @SuppressWarnings("java:S3077")
+  private volatile AbstractMetricService metricService;
+
+  private final Map<String, Set<PipeID>> pipeIDMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tsFileSizeMap = new ConcurrentHashMap<>();
+  private final Map<PipeID, Rate> tabletCountMap = new ConcurrentHashMap<>();
+
+  //////////////////////////// bindTo & unbindFrom (metric framework) 
////////////////////////////
+
+  @Override
+  public void bindTo(final AbstractMetricService metricService) {
+    this.metricService = metricService;
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      for (final PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        createMetrics(pipeID);
+      }
+    }
+  }
+
+  private void createMetrics(final PipeID pipeID) {
+    createRate(pipeID);
+  }
+
+  private void createRate(final PipeID pipeID) {
+    tsFileSizeMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+    tabletCountMap.put(
+        pipeID,
+        metricService.getOrCreateRate(
+            Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            pipeID.getPipeName(),
+            Tag.CREATION_TIME.toString(),
+            pipeID.getCreationTime(),
+            Tag.FROM.toString(),
+            pipeID.getCallerName()));
+  }
+
+  @Override
+  public void unbindFrom(final AbstractMetricService metricService) {
+    final ImmutableSet<String> pipeFullNames = 
ImmutableSet.copyOf(pipeIDMap.keySet());
+    for (final String pipeFullName : pipeFullNames) {
+      deregister(pipeFullName);
+    }
+    if (!pipeIDMap.isEmpty()) {
+      LOGGER.warn("Failed to unbind from pipeTsFileToTablet metrics,  
pipeIDMap not empty");
+    }
+  }
+
+  private void removeMetrics(final PipeID pipeID) {
+    removeRate(pipeID);
+  }
+
+  private void removeRate(final PipeID pipeID) {
+    tsFileSizeMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TSFILE_SIZE.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+    tabletCountMap.remove(pipeID);
+    metricService.remove(
+        MetricType.RATE,
+        Metric.PIPE_TSFILETOTABLET_TABLET_COUNT.toString(),
+        Tag.NAME.toString(),
+        pipeID.getPipeName(),
+        Tag.CREATION_TIME.toString(),
+        pipeID.getCreationTime(),
+        Tag.FROM.toString(),
+        pipeID.getCallerName());
+  }
+
+  //////////////////////////// register & deregister (pipe integration) 
////////////////////////////
+
+  public void register(@NonNull final PipeID pipeID) {
+    pipeIDMap.putIfAbsent(
+        pipeID.getPipeFullName(), Collections.newSetFromMap(new 
ConcurrentHashMap<>()));
+    if (pipeIDMap.get(pipeID.getPipeFullName()).add(pipeID) && 
Objects.nonNull(metricService)) {
+      createMetrics(pipeID);
+    }
+  }
+
+  public void deregister(final String pipeFullName) {
+    if (!pipeIDMap.containsKey(pipeFullName)) {
+      return;
+    }
+    if (Objects.nonNull(metricService)) {
+      for (PipeID pipeID : pipeIDMap.get(pipeFullName)) {
+        removeMetrics(pipeID);
+      }
+    }
+    pipeIDMap.remove(pipeFullName);
+  }
+
+  public void markTsFileSize(final PipeID pipeID, final long size) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = tsFileSizeMap.get(pipeID);
+    if (rate == null) {
+      LOGGER.info("Failed to mark pipe tsfile size, PipeID({}) does not 
exist", pipeID);
+      return;
+    }
+    rate.mark(size);
+  }
+
+  public void markTabletCount(final PipeID pipeID, final long count) {
+    if (Objects.isNull(metricService)) {
+      return;
+    }
+    final Rate rate = tabletCountMap.get(pipeID);
+    if (rate == null) {
+      LOGGER.info("Failed to mark pipe tablet count, PipeID({}) does not 
exist", pipeID);
+      return;
+    }
+    rate.mark(count);
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeTsFileToTabletMetricsHolder {
+
+    private static final PipeTsFileToTabletMetrics INSTANCE = new 
PipeTsFileToTabletMetrics();
+
+    private PipeTsFileToTabletMetricsHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeTsFileToTabletMetrics getInstance() {
+    return PipeTsFileToTabletMetrics.PipeTsFileToTabletMetricsHolder.INSTANCE;
+  }
+
+  private PipeTsFileToTabletMetrics() {
+    // empty constructor
+  }
+
+  public static class PipeID {
+    private final String pipeName;
+    private final String creationTime;
+    private final String callerName;
+
+    public PipeID(String pipeName, String creationTime, String callerName) {
+      this.pipeName = pipeName;
+      this.creationTime = creationTime;
+      this.callerName = callerName;
+    }
+
+    public static PipeID getPipeID(final String pipeName, final long 
creationTime) {

Review Comment:
   PipeCallerID?



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java:
##########
@@ -172,6 +172,8 @@ public enum Metric {
   PIPE_CONFIGNODE_REMAINING_TIME("pipe_confignode_remaining_time"),
   PIPE_GLOBAL_REMAINING_EVENT_COUNT("pipe_global_remaining_event_count"),
   PIPE_GLOBAL_REMAINING_TIME("pipe_global_remaining_time"),
+  PIPE_TSFILETOTABLET_TSFILE_SIZE("pipe_tsfiletotablet_tsfile_size"),

Review Comment:
   tsfile_to_tablet?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TabletInsertionEventIterable.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser;
+
+import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
+import org.apache.iotdb.db.pipe.metric.PipeTsFileToTabletMetrics;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import java.util.Iterator;
+
+public class TabletInsertionEventIterable implements 
Iterable<TabletInsertionEvent> {
+  private final Iterable<TabletInsertionEvent> originalIterable;
+  private int count = 0;
+  private boolean isMarked = false;
+  private final PipeInsertionEvent sourceEvent;
+
+  public TabletInsertionEventIterable(
+      Iterable<TabletInsertionEvent> originalIterable, PipeInsertionEvent 
sourceEvent) {
+    this.originalIterable = originalIterable;
+    this.sourceEvent = sourceEvent;
+  }
+
+  @Override
+  public Iterator<TabletInsertionEvent> iterator() {
+    return new Iterator<TabletInsertionEvent>() {
+      private final Iterator<TabletInsertionEvent> originalIterator = 
originalIterable.iterator();
+
+      @Override
+      public boolean hasNext() {
+        boolean hasNext = originalIterator.hasNext();
+        if (!hasNext && !isMarked) {
+          isMarked = true;
+          if (sourceEvent != null) {
+            PipeTsFileToTabletMetrics.getInstance()
+                .markTabletCount(
+                    PipeTsFileToTabletMetrics.PipeID.getPipeID(

Review Comment:
   This "getPipeID" may have some string parsing cost... Better cache it 
outside and pass it into this class and do not construct it each time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to