jixuan1989 commented on a change in pull request #1367:
URL: https://github.com/apache/incubator-iotdb/pull/1367#discussion_r446853767
##########
File path:
server/src/main/java/org/apache/iotdb/db/query/control/QueryResourceManager.java
##########
@@ -67,29 +69,55 @@
private Map<Long, Set<WeakReference<TsFileResource>>> seqFileNumMap = new
ConcurrentHashMap<>();
private Map<Long, Set<WeakReference<TsFileResource>>> unseqFileNumMap = new
ConcurrentHashMap<>();
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
/**
* Record temporary files used for external sorting.
* <p>
* Key: query job id. Value: temporary file list used for external sorting.
*/
- private Map<Long, List<IExternalSortFileDeserializer>> externalSortFileMap;
+ private final Map<Long, List<IExternalSortFileDeserializer>>
externalSortFileMap;
+
+ private final Map<Long, Long> queryIdEstimatedMemoryMap;
+
+ // current total free memory for reading process(not including the cache
memory)
+ private final AtomicLong totalFreeMemoryForRead;
+
+ // estimated size for one point memory size, the unit is byte
+ private static final long POINT_ESTIMATED_SIZE = 16L;
private QueryResourceManager() {
filePathsManager = new QueryFileManager();
externalSortFileMap = new ConcurrentHashMap<>();
+ queryIdEstimatedMemoryMap = new ConcurrentHashMap<>();
+ totalFreeMemoryForRead = new AtomicLong(
+
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForReadWithoutCache());
}
public static QueryResourceManager getInstance() {
return QueryTokenManagerHelper.INSTANCE;
}
+ public int getMaxDeduplicatedPathNum(int fetchSize) {
+ return Math.max((int) ((totalFreeMemoryForRead.get() / fetchSize) /
POINT_ESTIMATED_SIZE), 0);
+ }
+
/**
* Register a new query. When a query request is created firstly, this
method must be invoked.
*/
- public long assignQueryId(boolean isDataQuery) {
+ public long assignQueryId(boolean isDataQuery, int fetchSize, int
deduplicatedPathNum) {
long queryId = queryIdAtom.incrementAndGet();
if (isDataQuery) {
filePathsManager.addQueryId(queryId);
+ if (deduplicatedPathNum > 0) {
+ long estimatedMemoryUsage =
+ (long) deduplicatedPathNum * POINT_ESTIMATED_SIZE * (long)
fetchSize;
+ // apply the memory successfully
+ if (totalFreeMemoryForRead.addAndGet(-estimatedMemoryUsage) >= 0) {
+ queryIdEstimatedMemoryMap.put(queryId, estimatedMemoryUsage);
+ } else {
+ totalFreeMemoryForRead.addAndGet(estimatedMemoryUsage);
Review comment:
why add memory here..
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]