jt2594838 commented on code in PR #16435:
URL: https://github.com/apache/iotdb/pull/16435#discussion_r2422549283


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -609,7 +617,43 @@ public Iterable<TabletInsertionEvent> 
toTabletInsertionEvents(final long timeout
         return Collections.emptyList();
       }
       waitForResourceEnough4Parsing(timeoutMs);
-      return initEventParser().toTabletInsertionEvents();
+      return () -> {
+        final Iterator<TsFileInsertionEventParser> parserIterator = 
initEventParsers().iterator();
+        return new Iterator<TabletInsertionEvent>() {
+          private TsFileInsertionEventParser currentParser = null;
+          private Iterator<TabletInsertionEvent> currentEventIterator = 
Collections.emptyIterator();
+
+          private void closeCurrentParser() {
+            if (Objects.nonNull(currentParser)) {
+              currentParser.close();
+              currentParser = null;
+            }
+          }
+
+          @Override
+          public boolean hasNext() {
+            while (!currentEventIterator.hasNext() && 
parserIterator.hasNext()) {
+              closeCurrentParser();
+              currentParser = parserIterator.next();
+              currentEventIterator = 
currentParser.toTabletInsertionEvents().iterator();
+            }
+
+            if (!currentEventIterator.hasNext()) {
+              closeCurrentParser();
+            }
+
+            return currentEventIterator.hasNext();
+          }
+
+          @Override
+          public TabletInsertionEvent next() {
+            if (!hasNext()) {
+              throw new NoSuchElementException();
+            }
+            return currentEventIterator.next();
+          }
+        };
+      };

Review Comment:
   I would suggest that you put patterns inside the parser, instead of each 
parser for one pattern, because:
   1. overlapped patterns, like root.db1.** and root.db1.d1.**, may produce 
redundant results;
   2. each pattern may result in a traverse in a TsFile, which could be 
inefficient.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java:
##########
@@ -406,13 +413,19 @@ public EnrichedEvent getSourceEvent() {
   @Override
   public Iterable<TabletInsertionEvent> processRowByRow(
       final BiConsumer<Row, RowCollector> consumer) {
-    return initEventParser().processRowByRow(consumer);
+    return initEventParsers().stream()
+        .map(tabletInsertionEventParser -> 
tabletInsertionEventParser.processRowByRow(consumer))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }
 
   @Override
   public Iterable<TabletInsertionEvent> processTablet(
       final BiConsumer<Tablet, RowCollector> consumer) {
-    return initEventParser().processTablet(consumer);
+    return initEventParsers().stream()
+        .map(tabletInsertionEventParser -> 
tabletInsertionEventParser.processTablet(consumer))
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
   }

Review Comment:
   Will an insertion be sent twice if I define patterns like:
   root.db1.d1.s1
   root.db1.d1.*



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java:
##########
@@ -68,48 +72,97 @@ public boolean isRoot() {
    *
    * @return The interpreted {@link TreePattern} which is not {@code null}.
    */
-  public static TreePattern parsePipePatternFromSourceParameters(
+  public static List<TreePattern> parsePipePatternFromSourceParameters(
       final PipeParameters sourceParameters) {
     final boolean isTreeModelDataAllowedToBeCaptured =
         isTreeModelDataAllowToBeCaptured(sourceParameters);
 
     final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY, 
SOURCE_PATH_KEY);
+    final String pattern =
+        sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, 
SOURCE_PATTERN_KEY);
 
-    // 1. If "source.path" is specified, it will be interpreted as an 
IoTDB-style path,
-    // ignoring the other 2 parameters.
-    if (path != null) {
-      return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, path);
+    // 1. If both "source.path" and "source.pattern" are specified, their 
union will be used.
+    if (path != null && pattern != null) {
+      final List<TreePattern> result = new ArrayList<>();
+      // Parse "source.path" as IoTDB-style path.
+      result.addAll(
+          parseMultiplePatterns(
+              path, p -> new 
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)));
+      // Parse "source.pattern" using the helper method.
+      result.addAll(
+          parsePatternsFromPatternParameter(
+              pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured));
+      return result;
     }
 
-    final String pattern =
-        sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY, 
SOURCE_PATTERN_KEY);
+    // 2. If only "source.path" is specified, it will be interpreted as an 
IoTDB-style path.
+    if (path != null) {
+      return parseMultiplePatterns(
+          path, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, 
p));
+    }
 
-    // 2. Otherwise, If "source.pattern" is specified, it will be interpreted
-    // according to "source.pattern.format".
+    // 3. If only "source.pattern" is specified, parse it using the helper 
method.
     if (pattern != null) {
-      final String patternFormat =
-          sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, 
SOURCE_PATTERN_FORMAT_KEY);
+      return parsePatternsFromPatternParameter(
+          pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured);
+    }
 
-      // If "source.pattern.format" is not specified, use prefix format by 
default.
-      if (patternFormat == null) {
-        return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, 
pattern);
-      }
+    // 4. If neither "source.path" nor "source.pattern" is specified,
+    // this pipe source will match all data.
+    return Collections.singletonList(
+        new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null));
+  }
 
-      switch (patternFormat.toLowerCase()) {
-        case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
-          return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, 
pattern);
-        case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
-          return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, 
pattern);
-        default:
-          LOGGER.info(
-              "Unknown pattern format: {}, use prefix matching format by 
default.", patternFormat);
-          return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, 
pattern);
-      }
+  /**
+   * A private helper method to parse a list of {@link TreePattern}s from the 
"pattern" parameter,
+   * considering its "format".
+   *
+   * @param pattern The pattern string to parse.
+   * @param sourceParameters The source parameters to read the format from.
+   * @param isTreeModelDataAllowedToBeCaptured A boolean flag passed to the 
TreePattern constructor.
+   * @return A list of parsed {@link TreePattern}s.
+   */
+  private static List<TreePattern> parsePatternsFromPatternParameter(
+      final String pattern,
+      final PipeParameters sourceParameters,
+      final boolean isTreeModelDataAllowedToBeCaptured) {
+    final String patternFormat =
+        sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY, 
SOURCE_PATTERN_FORMAT_KEY);
+
+    // If "source.pattern.format" is not specified, use prefix format by 
default.
+    if (patternFormat == null) {
+      return parseMultiplePatterns(
+          pattern, p -> new 
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
     }
 
-    // 3. If neither "source.path" nor "source.pattern" is specified,
-    // this pipe source will match all data.
-    return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null);
+    switch (patternFormat.toLowerCase()) {
+      case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+        return parseMultiplePatterns(
+            pattern, p -> new 
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+      case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+        return parseMultiplePatterns(
+            pattern, p -> new 
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+      default:
+        LOGGER.info(
+            "Unknown pattern format: {}, use prefix matching format by 
default.", patternFormat);
+        return parseMultiplePatterns(
+            pattern, p -> new 
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+    }
+  }
+
+  private static List<TreePattern> parseMultiplePatterns(
+      final String pattern, final Function<String, TreePattern> 
patternSupplier) {
+    if (pattern.isEmpty()) {
+      return Collections.singletonList(patternSupplier.apply(pattern));
+    }
+    final List<TreePattern> patterns = new ArrayList<>();
+    // Support comma-separated multiple patterns
+    for (final String singlePattern : pattern.split(",")) {
+      if (!singlePattern.trim().isEmpty()) {
+        patterns.add(patternSupplier.apply(singlePattern.trim()));
+      }
+    }
+    return patterns;

Review Comment:
   Oops, how about paths like root.db1.\`a,b\`.**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to