luoluoyuyu commented on code in PR #13167:
URL: https://github.com/apache/iotdb/pull/13167#discussion_r1718135369
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -208,6 +225,58 @@ protected boolean executeOnce() throws Exception {
return true;
}
+ private synchronized Event filter(final Event event) { // make it
synchronized
+ if (Objects.isNull(event)) {
+ return null;
+ }
+
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ final EnrichedEvent sourceEvent =
pipeRawTabletInsertionEvent.getSourceEvent();
+ if (sourceEvent instanceof PipeTsFileInsertionEvent
+ && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+ // Discard duplicate event, because pipeConsensus doesn't expect to
assign a commit id for
+ // duplicate events to ensure that events can be received in order.
The consequence of this
+ // is that ProgressIndex is not updated in time, but this will not
affect the correctness.
+
pipeRawTabletInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+ return null;
+ }
+ }
+
+ if (event instanceof PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) event;
+ if (isDuplicated(pipeTsFileInsertionEvent)) {
+ // ditto
+
pipeTsFileInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+ return null;
+ }
+ }
+
+ return event;
+ }
+
+ private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+ final int hashCode = event.getTsFile().hashCode();
+ final boolean isGeneratedByHistoricalExtractor =
event.isGeneratedByHistoricalExtractor();
+ final Boolean existedIsGeneratedByHistoricalExtractor =
+ hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+ if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+ hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode,
isGeneratedByHistoricalExtractor);
+ return false;
+ }
+ // Multiple PipeRawTabletInsertionEvents parsed from the same
PipeTsFileInsertionEvent (i.e.,
+ // with the same isGeneratedByHistoricalExtractor field) are not
considered duplicates.
+ if (Objects.equals(existedIsGeneratedByHistoricalExtractor,
isGeneratedByHistoricalExtractor)) {
Review Comment:
if (existedIsGeneratedByHistoricalExtractor ==
isGeneratedByHistoricalExtractor ) {
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java:
##########
@@ -208,6 +225,58 @@ protected boolean executeOnce() throws Exception {
return true;
}
+ private synchronized Event filter(final Event event) { // make it
synchronized
+ if (Objects.isNull(event)) {
+ return null;
+ }
+
+ if (event instanceof PipeRawTabletInsertionEvent) {
+ final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
+ (PipeRawTabletInsertionEvent) event;
+ final EnrichedEvent sourceEvent =
pipeRawTabletInsertionEvent.getSourceEvent();
+ if (sourceEvent instanceof PipeTsFileInsertionEvent
+ && isDuplicated((PipeTsFileInsertionEvent) sourceEvent)) {
+ // Discard duplicate event, because pipeConsensus doesn't expect to
assign a commit id for
+ // duplicate events to ensure that events can be received in order.
The consequence of this
+ // is that ProgressIndex is not updated in time, but this will not
affect the correctness.
+
pipeRawTabletInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+ return null;
+ }
+ }
+
+ if (event instanceof PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) event;
+ if (isDuplicated(pipeTsFileInsertionEvent)) {
+ // ditto
+
pipeTsFileInsertionEvent.clearReferenceCount(PipeProcessorSubtask.class.getName());
+ return null;
+ }
+ }
+
+ return event;
+ }
+
+ private boolean isDuplicated(final PipeTsFileInsertionEvent event) {
+ final int hashCode = event.getTsFile().hashCode();
+ final boolean isGeneratedByHistoricalExtractor =
event.isGeneratedByHistoricalExtractor();
+ final Boolean existedIsGeneratedByHistoricalExtractor =
+ hashCodeToIsGeneratedByHistoricalExtractor.getIfPresent(hashCode);
+ if (Objects.isNull(existedIsGeneratedByHistoricalExtractor)) {
+ hashCodeToIsGeneratedByHistoricalExtractor.put(hashCode,
isGeneratedByHistoricalExtractor);
+ return false;
+ }
+ // Multiple PipeRawTabletInsertionEvents parsed from the same
PipeTsFileInsertionEvent (i.e.,
+ // with the same isGeneratedByHistoricalExtractor field) are not
considered duplicates.
+ if (Objects.equals(existedIsGeneratedByHistoricalExtractor,
isGeneratedByHistoricalExtractor)) {
Review Comment:
if (existedIsGeneratedByHistoricalExtractor ==
isGeneratedByHistoricalExtractor ) {
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]