jt2594838 commented on code in PR #16435:
URL: https://github.com/apache/iotdb/pull/16435#discussion_r2422549283
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java:
##########
@@ -609,7 +617,43 @@ public Iterable<TabletInsertionEvent>
toTabletInsertionEvents(final long timeout
return Collections.emptyList();
}
waitForResourceEnough4Parsing(timeoutMs);
- return initEventParser().toTabletInsertionEvents();
+ return () -> {
+ final Iterator<TsFileInsertionEventParser> parserIterator =
initEventParsers().iterator();
+ return new Iterator<TabletInsertionEvent>() {
+ private TsFileInsertionEventParser currentParser = null;
+ private Iterator<TabletInsertionEvent> currentEventIterator =
Collections.emptyIterator();
+
+ private void closeCurrentParser() {
+ if (Objects.nonNull(currentParser)) {
+ currentParser.close();
+ currentParser = null;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (!currentEventIterator.hasNext() &&
parserIterator.hasNext()) {
+ closeCurrentParser();
+ currentParser = parserIterator.next();
+ currentEventIterator =
currentParser.toTabletInsertionEvents().iterator();
+ }
+
+ if (!currentEventIterator.hasNext()) {
+ closeCurrentParser();
+ }
+
+ return currentEventIterator.hasNext();
+ }
+
+ @Override
+ public TabletInsertionEvent next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return currentEventIterator.next();
+ }
+ };
+ };
Review Comment:
I would suggest that you put patterns inside the parser, instead of each
parser for one pattern, because:
1. overlapped patterns, like root.db1.** and root.db1.d1.**, may produce
redundant results;
2. each pattern may result in a traverse in a TsFile, which could be
inefficient.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java:
##########
@@ -406,13 +413,19 @@ public EnrichedEvent getSourceEvent() {
@Override
public Iterable<TabletInsertionEvent> processRowByRow(
final BiConsumer<Row, RowCollector> consumer) {
- return initEventParser().processRowByRow(consumer);
+ return initEventParsers().stream()
+ .map(tabletInsertionEventParser ->
tabletInsertionEventParser.processRowByRow(consumer))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
@Override
public Iterable<TabletInsertionEvent> processTablet(
final BiConsumer<Tablet, RowCollector> consumer) {
- return initEventParser().processTablet(consumer);
+ return initEventParsers().stream()
+ .map(tabletInsertionEventParser ->
tabletInsertionEventParser.processTablet(consumer))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
}
Review Comment:
Will an insertion be sent twice if I define patterns like:
root.db1.d1.s1
root.db1.d1.*
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java:
##########
@@ -68,48 +72,97 @@ public boolean isRoot() {
*
* @return The interpreted {@link TreePattern} which is not {@code null}.
*/
- public static TreePattern parsePipePatternFromSourceParameters(
+ public static List<TreePattern> parsePipePatternFromSourceParameters(
final PipeParameters sourceParameters) {
final boolean isTreeModelDataAllowedToBeCaptured =
isTreeModelDataAllowToBeCaptured(sourceParameters);
final String path = sourceParameters.getStringByKeys(EXTRACTOR_PATH_KEY,
SOURCE_PATH_KEY);
+ final String pattern =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
- // 1. If "source.path" is specified, it will be interpreted as an
IoTDB-style path,
- // ignoring the other 2 parameters.
- if (path != null) {
- return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, path);
+ // 1. If both "source.path" and "source.pattern" are specified, their
union will be used.
+ if (path != null && pattern != null) {
+ final List<TreePattern> result = new ArrayList<>();
+ // Parse "source.path" as IoTDB-style path.
+ result.addAll(
+ parseMultiplePatterns(
+ path, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)));
+ // Parse "source.pattern" using the helper method.
+ result.addAll(
+ parsePatternsFromPatternParameter(
+ pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured));
+ return result;
}
- final String pattern =
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_KEY,
SOURCE_PATTERN_KEY);
+ // 2. If only "source.path" is specified, it will be interpreted as an
IoTDB-style path.
+ if (path != null) {
+ return parseMultiplePatterns(
+ path, p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured,
p));
+ }
- // 2. Otherwise, If "source.pattern" is specified, it will be interpreted
- // according to "source.pattern.format".
+ // 3. If only "source.pattern" is specified, parse it using the helper
method.
if (pattern != null) {
- final String patternFormat =
- sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+ return parsePatternsFromPatternParameter(
+ pattern, sourceParameters, isTreeModelDataAllowedToBeCaptured);
+ }
- // If "source.pattern.format" is not specified, use prefix format by
default.
- if (patternFormat == null) {
- return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured,
pattern);
- }
+ // 4. If neither "source.path" nor "source.pattern" is specified,
+ // this pipe source will match all data.
+ return Collections.singletonList(
+ new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null));
+ }
- switch (patternFormat.toLowerCase()) {
- case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
- return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured,
pattern);
- case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
- return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured,
pattern);
- default:
- LOGGER.info(
- "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
- return new PrefixTreePattern(isTreeModelDataAllowedToBeCaptured,
pattern);
- }
+ /**
+ * A private helper method to parse a list of {@link TreePattern}s from the
"pattern" parameter,
+ * considering its "format".
+ *
+ * @param pattern The pattern string to parse.
+ * @param sourceParameters The source parameters to read the format from.
+ * @param isTreeModelDataAllowedToBeCaptured A boolean flag passed to the
TreePattern constructor.
+ * @return A list of parsed {@link TreePattern}s.
+ */
+ private static List<TreePattern> parsePatternsFromPatternParameter(
+ final String pattern,
+ final PipeParameters sourceParameters,
+ final boolean isTreeModelDataAllowedToBeCaptured) {
+ final String patternFormat =
+ sourceParameters.getStringByKeys(EXTRACTOR_PATTERN_FORMAT_KEY,
SOURCE_PATTERN_FORMAT_KEY);
+
+ // If "source.pattern.format" is not specified, use prefix format by
default.
+ if (patternFormat == null) {
+ return parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
}
- // 3. If neither "source.path" nor "source.pattern" is specified,
- // this pipe source will match all data.
- return new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, null);
+ switch (patternFormat.toLowerCase()) {
+ case EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE:
+ return parseMultiplePatterns(
+ pattern, p -> new
IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ case EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE:
+ return parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ default:
+ LOGGER.info(
+ "Unknown pattern format: {}, use prefix matching format by
default.", patternFormat);
+ return parseMultiplePatterns(
+ pattern, p -> new
PrefixTreePattern(isTreeModelDataAllowedToBeCaptured, p));
+ }
+ }
+
+ private static List<TreePattern> parseMultiplePatterns(
+ final String pattern, final Function<String, TreePattern>
patternSupplier) {
+ if (pattern.isEmpty()) {
+ return Collections.singletonList(patternSupplier.apply(pattern));
+ }
+ final List<TreePattern> patterns = new ArrayList<>();
+ // Support comma-separated multiple patterns
+ for (final String singlePattern : pattern.split(",")) {
+ if (!singlePattern.trim().isEmpty()) {
+ patterns.add(patternSupplier.apply(singlePattern.trim()));
+ }
+ }
+ return patterns;
Review Comment:
Oops, how about paths like root.db1.\`a,b\`.**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]