SteveYurongSu commented on code in PR #11757:
URL: https://github.com/apache/iotdb/pull/11757#discussion_r1436043022
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java:
##########
@@ -130,22 +144,52 @@ public ProgressIndex getProgressIndex() {
@Override
public PipeInsertNodeTabletInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName, PipeTaskMeta pipeTaskMeta, String pattern) {
+ String pipeName, PipeTaskMeta pipeTaskMeta, String pattern, long
startTime, long endTime) {
return new PipeInsertNodeTabletInsertionEvent(
walEntryHandler,
progressIndex,
isAligned,
isGeneratedByPipe,
pipeName,
pipeTaskMeta,
- pattern);
+ pattern,
+ startTime,
+ endTime);
}
@Override
public boolean isGeneratedByPipe() {
return isGeneratedByPipe;
}
+ @Override
+ public boolean isEventTimeOverlappedWithTimeRange() {
+ try {
+ InsertNode insertNode = getInsertNode();
+ if (insertNode instanceof InsertRowNode) {
+ long timestamp = ((InsertRowNode) insertNode).getTime();
+ return startTime <= timestamp && timestamp <= endTime;
+ } else if (insertNode instanceof InsertTabletNode) {
+ long maxTimestamp = Long.MIN_VALUE;
+ long minTimestamp = Long.MAX_VALUE;
+ for (long timestamp : ((InsertTabletNode) insertNode).getTimes()) {
+ maxTimestamp = Math.max(maxTimestamp, timestamp);
+ minTimestamp = Math.min(minTimestamp, timestamp);
+ }
+ return startTime <= maxTimestamp && minTimestamp <= endTime;
+ } else {
+ throw new UnSupportedDataTypeException(
+ String.format("InsertNode type %s is not supported.",
insertNode.getClass().getName()));
+ }
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Exception occurred when determining the event time of
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: {}.
Returning true to ensure data integrity.",
+ this,
+ e.getMessage());
Review Comment:
```suggestion
LOGGER.warn(
"Exception occurred when determining the event time of
PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}].
Returning true to ensure data integrity.",
this,
startTime,
endTime,
e);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]