Alima777 commented on a change in pull request #2859:
URL: https://github.com/apache/iotdb/pull/2859#discussion_r595972473
##########
File path:
server/src/main/java/org/apache/iotdb/db/query/dataset/RawQueryDataSetWithValueFilter.java
##########
@@ -64,102 +65,144 @@ public RawQueryDataSetWithValueFilter(
@Override
public boolean hasNextWithoutConstraint() throws IOException {
- if (hasCachedRow) {
+ if (!cachedRowRecords.isEmpty()) {
return true;
}
- return cacheRowRecord();
+ return cacheRowRecords();
}
+ /** @return the first record of cached rows or null if there is no more data
*/
@Override
public RowRecord nextWithoutConstraint() throws IOException {
- if (!hasCachedRow && !cacheRowRecord()) {
+ if (cachedRowRecords.isEmpty() && !cacheRowRecords()) {
return null;
}
- hasCachedRow = false;
- return cachedRowRecord;
+
+ return cachedRowRecords.remove(0);
}
/**
- * Cache row record
+ * Cache row records
*
* @return if there has next row record.
*/
- private boolean cacheRowRecord() throws IOException {
- while (timeGenerator.hasNext()) {
- boolean hasField = false;
- long timestamp = timeGenerator.next();
- RowRecord rowRecord = new RowRecord(timestamp);
-
- for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
- Object value;
- // get value from readers in time generator
- if (cached.get(i)) {
- value = timeGenerator.getValue(paths.get(i), timestamp);
- } else {
- // get value from series reader without filter
- IReaderByTimestamp reader = seriesReaderByTimestampList.get(i);
- value = reader.getValueInTimestamp(timestamp);
- }
- if (value == null) {
- rowRecord.addField(null);
+ private boolean cacheRowRecords() throws IOException {
+ int cachedTimeCnt = 0;
+ long[] cachedTimeArray = new long[fetchSize];
+ // TODO: LIMIT constraint
+ // 1. fill time array from time Generator
+ while (timeGenerator.hasNext() && cachedTimeCnt < fetchSize) {
+ cachedTimeArray[cachedTimeCnt++] = timeGenerator.next();
+ }
+ if (cachedTimeCnt == 0) {
+ return false;
+ }
+ RowRecord[] rowRecords = new RowRecord[cachedTimeCnt];
+ for (int i = 0; i < cachedTimeCnt; i++) {
+ rowRecords[i] = new RowRecord(cachedTimeArray[i]);
+ }
+
+ boolean[] hasField = new boolean[cachedTimeCnt];
+ // 2. fetch results of each time series using time array
+ for (int i = 0; i < seriesReaderByTimestampList.size(); i++) {
+ Object[] results;
+ // get value from readers in time generator
+ if (cached.get(i)) {
+ results = timeGenerator.getValues(paths.get(i));
+ } else {
+ results =
+ seriesReaderByTimestampList
+ .get(i)
+ .getValuesInTimestamps(cachedTimeArray, cachedTimeCnt);
+ }
+
+ // 3. use values in results to fill row record
+ for (int j = 0; j < cachedTimeCnt; j++) {
+ if (results[j] == null) {
+ rowRecords[j].addField(null);
} else {
- hasField = true;
- rowRecord.addField(value, dataTypes.get(i));
+ hasField[j] = true;
+ rowRecords[j].addField(results[j], dataTypes.get(i));
}
}
- if (hasField) {
- hasCachedRow = true;
- cachedRowRecord = rowRecord;
- break;
+ }
+ // 4. remove rowRecord if all values in one timestamp are null
+ for (int i = 0; i < cachedTimeCnt; i++) {
+ if (hasField[i]) {
+ cachedRowRecords.add(rowRecords[i]);
}
}
- return hasCachedRow;
+
+ // 5. check whether there is next row record
+ return !cachedRowRecords.isEmpty();
Review comment:
Fixed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]