luoluoyuyu commented on code in PR #13167:
URL: https://github.com/apache/iotdb/pull/13167#discussion_r1718135369


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -208,6 +225,58 @@ protected boolean executeOnce() throws Exception {
     return true;
   }
 
+  private synchronized Event filter(final Event event) { // make it 
synchronized
+    if (Objects.isNull(event)) {
+      return null;
+    }
+
+    if (event instanceof PipeRawTabletInsertionEvent) {
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) event;
+      final EnrichedEvent sourceEvent = 
pipeRawTabletInsertionEvent.getSourceEvent();
+      if (sourceEvent instanceof PipeTsFileInsertionEvent
+          && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+        // Discard duplicate event, because pipeConsensus doesn't expect to 
assign a commit id for
+        // duplicate events to ensure that events can be received in order. 
The consequence of this
+        // is that ProgressIndex is not updated in time, but this will not 
affect the correctness.
+        
pipeRawTabletInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+        return null;
+      }
+    }
+
+    if (event instanceof PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = 
(PipeTsFileInsertionEvent) event;
+      if (isDuplicated(pipeTsFileInsertionEvent)) {
+        // ditto
+        
pipeTsFileInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+        return null;
+      }
+    }
+
+    return event;
+  }
+
+  private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+    final int hashCode = event.getTsFile().hashCode();
+    final boolean isGeneratedByHistoricalExtractor = 
event.isGeneratedByHistoricalExtractor();
+    final Boolean existedIsGeneratedByHistoricalExtractor =
+        hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+    if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+      hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode, 
isGeneratedByHistoricalExtractor);
+      return false;
+    }
+    // Multiple PipeRawTabletInsertionEvents parsed from the same 
PipeTsFileInsertionEvent (i.e.,
+    // with the same isGeneratedByHistoricalExtractor field) are not 
considered duplicates.
+    if (Objects.equals(existedIsGeneratedByHistoricalExtractor, 
isGeneratedByHistoricalExtractor)) {

Review Comment:
   if (existedIsGeneratedByHistoricalExtractor == 
isGeneratedByHistoricalExtractor ) {



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -208,6 +225,58 @@ protected boolean executeOnce() throws Exception {
     return true;
   }
 
+  private synchronized Event filter(final Event event) { // make it 
synchronized
+    if (Objects.isNull(event)) {
+      return null;
+    }
+
+    if (event instanceof PipeRawTabletInsertionEvent) {
+      final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+          (PipeRawTabletInsertionEvent) event;
+      final EnrichedEvent sourceEvent = 
pipeRawTabletInsertionEvent.getSourceEvent();
+      if (sourceEvent instanceof PipeTsFileInsertionEvent
+          && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+        // Discard duplicate event, because pipeConsensus doesn't expect to 
assign a commit id for
+        // duplicate events to ensure that events can be received in order. 
The consequence of this
+        // is that ProgressIndex is not updated in time, but this will not 
affect the correctness.
+        
pipeRawTabletInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+        return null;
+      }
+    }
+
+    if (event instanceof PipeTsFileInsertionEvent) {
+      final PipeTsFileInsertionEvent pipeTsFileInsertionEvent = 
(PipeTsFileInsertionEvent) event;
+      if (isDuplicated(pipeTsFileInsertionEvent)) {
+        // ditto
+        
pipeTsFileInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+        return null;
+      }
+    }
+
+    return event;
+  }
+
+  private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+    final int hashCode = event.getTsFile().hashCode();
+    final boolean isGeneratedByHistoricalExtractor = 
event.isGeneratedByHistoricalExtractor();
+    final Boolean existedIsGeneratedByHistoricalExtractor =
+        hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+    if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+      hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode, 
isGeneratedByHistoricalExtractor);
+      return false;
+    }
+    // Multiple PipeRawTabletInsertionEvents parsed from the same 
PipeTsFileInsertionEvent (i.e.,
+    // with the same isGeneratedByHistoricalExtractor field) are not 
considered duplicates.
+    if (Objects.equals(existedIsGeneratedByHistoricalExtractor, 
isGeneratedByHistoricalExtractor)) {

Review Comment:
   if (existedIsGeneratedByHistoricalExtractor == 
isGeneratedByHistoricalExtractor ) {



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to