jt2594838 commented on code in PR #13068:
URL: https://github.com/apache/iotdb/pull/13068#discussion_r1728455300
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java:
##########
@@ -607,16 +616,26 @@ private void recover() throws DataRegionException {
throw new DataRegionException(e);
}
- initCompactionSchedule();
+ if (asyncTsFileResourceRecoverTaskList.isEmpty()) {
+ initCompactionSchedule();
Review Comment:
Is this called after all tasks of a region finish?
##########
integration-test/src/main/java/org/apache/iotdb/it/env/EnvType.java:
##########
@@ -28,7 +28,7 @@ public enum EnvType {
TABLE_CLUSTER1;
public static EnvType getSystemEnvType() {
- String envValue = System.getProperty("TestEnv", Simple.name());
+ String envValue = System.getProperty("TestEnv", Cluster1.name());
return EnvType.valueOf(envValue);
Review Comment:
What is this for?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIMEINDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 10, 10,
TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndexes(List<TsFileResource> tsFileResources) {
+ if (!tsFileResources.isEmpty()) {
+ TsFileResource firstResource = tsFileResources.get(0);
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES *
tsFileResources.size());
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
Review Comment:
Better to add some comments for the 5 * 8 bytes.
It repeats several times, may use a static variable.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIMEINDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 10, 10,
TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndexes(List<TsFileResource> tsFileResources) {
+ if (!tsFileResources.isEmpty()) {
+ TsFileResource firstResource = tsFileResources.get(0);
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES *
tsFileResources.size());
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+ }
+
+ public void logFileTimeIndex(TsFileResource tsFileResource) {
Review Comment:
The two methods are highly similar. How about using `public void
logFileTimeIndex(TsFileResource... tsFileResources)`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIMEINDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 10, 10,
TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndexes(List<TsFileResource> tsFileResources) {
+ if (!tsFileResources.isEmpty()) {
+ TsFileResource firstResource = tsFileResources.get(0);
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES *
tsFileResources.size());
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+ }
+
+ public void logFileTimeIndex(TsFileResource tsFileResource) {
+ TsFileID tsFileID = tsFileResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ tsFileResource.getDatabaseName(),
tsFileResource.getDataRegionId());
+
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES);
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+
+ public void compactFileTimeIndexIfNeeded(
+ String dataBaseName,
+ int dataRegionId,
+ long partitionId,
+ TsFileResourceList sequenceFiles,
+ TsFileResourceList unsequenceFiles) {
+ FileTimeIndexCacheWriter writer =
+ getWriter(
+ dataRegionId,
+ partitionId,
+ StorageEngine.getDataRegionSystemDir(dataBaseName,
String.valueOf(dataRegionId)));
+
+ int currentResourceCount =
+ (sequenceFiles == null ? 0 : sequenceFiles.size())
+ + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
+ if (writer.getLogFile().length() > currentResourceCount * (5 * Long.BYTES)
* 100) {
Review Comment:
After the task is submitted and before it is executed, the condition is
highly likely to remain true.
Is it possible that several redundant tasks are submitted during this period?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java:
##########
@@ -94,10 +79,52 @@ public void updateMultiDeviceFlushedTime(
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
}
+ // For recover only
+ @Override
+ public void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ // upgrade
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+ ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ } else {
+ // should not go here
+ long memIncr = 0;
Review Comment:
If so, how about printing a warning log with the stack?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java:
##########
@@ -178,17 +180,18 @@ public static void blockInsertionIfReject() throws
WriteProcessRejectException {
}
}
- public boolean isAllSgReady() {
- return isAllSgReady.get();
+ public boolean isReadyForReadAndWrite() {
+ return isReadyForReadAndWrite.get();
}
- public void setAllSgReady(boolean allSgReady) {
- isAllSgReady.set(allSgReady);
+ public boolean isReady() {
+ return isReady.get();
}
Review Comment:
`isReady()` needs a better name or some comments to distinguish it from
`isReadForReadAndWrite.`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndexCacheRecorder.java:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.file.SystemFileFactory;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceList;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent;
+
+public class FileTimeIndexCacheRecorder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FileTimeIndexCacheRecorder.class);
+
+ private static final int VERSION = 0;
+
+ protected static final String FILE_NAME = "FileTimeIndexCache_" + VERSION;
+
+ private final ScheduledExecutorService recordFileIndexThread;
+
+ private final BlockingQueue<Runnable> taskQueue = new
LinkedBlockingQueue<>();
+
+ private final Map<Integer, Map<Long, FileTimeIndexCacheWriter>> writerMap =
+ new ConcurrentHashMap<>();
+
+ private FileTimeIndexCacheRecorder() {
+ recordFileIndexThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIMEINDEX_RECORD.getName());
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ recordFileIndexThread, this::executeTasks, 10, 10,
TimeUnit.MILLISECONDS);
+ }
+
+ private void executeTasks() {
+ Runnable task;
+ while ((task = taskQueue.poll()) != null) {
+ recordFileIndexThread.submit(task);
+ }
+ }
+
+ public void logFileTimeIndexes(List<TsFileResource> tsFileResources) {
+ if (!tsFileResources.isEmpty()) {
+ TsFileResource firstResource = tsFileResources.get(0);
+ TsFileID tsFileID = firstResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ firstResource.getDatabaseName(),
firstResource.getDataRegionId());
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES *
tsFileResources.size());
+ for (TsFileResource tsFileResource : tsFileResources) {
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ }
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+ }
+
+ public void logFileTimeIndex(TsFileResource tsFileResource) {
+ TsFileID tsFileID = tsFileResource.getTsFileID();
+ int dataRegionId = tsFileID.regionId;
+ long partitionId = tsFileID.timePartitionId;
+ File dataRegionSysDir =
+ StorageEngine.getDataRegionSystemDir(
+ tsFileResource.getDatabaseName(),
tsFileResource.getDataRegionId());
+
+ FileTimeIndexCacheWriter writer = getWriter(dataRegionId, partitionId,
dataRegionSysDir);
+ boolean result =
+ taskQueue.offer(
+ () -> {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(5 * Long.BYTES);
+ tsFileResource.serializeFileTimeIndexToByteBuffer(buffer);
+ buffer.flip();
+ writer.write(buffer);
+ } catch (IOException e) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache: {}",
e.getMessage());
+ }
+ });
+ if (!result) {
+ LOGGER.warn("Meet error when record FileTimeIndexCache");
+ }
+ }
+
+ public void compactFileTimeIndexIfNeeded(
+ String dataBaseName,
+ int dataRegionId,
+ long partitionId,
+ TsFileResourceList sequenceFiles,
+ TsFileResourceList unsequenceFiles) {
+ FileTimeIndexCacheWriter writer =
+ getWriter(
+ dataRegionId,
+ partitionId,
+ StorageEngine.getDataRegionSystemDir(dataBaseName,
String.valueOf(dataRegionId)));
+
+ int currentResourceCount =
+ (sequenceFiles == null ? 0 : sequenceFiles.size())
+ + (unsequenceFiles == null ? 0 : unsequenceFiles.size());
+ if (writer.getLogFile().length() > currentResourceCount * (5 * Long.BYTES)
* 100) {
Review Comment:
Should add some comments and give the `100` a proper name.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/HashLastFlushTimeMap.java:
##########
@@ -94,10 +79,52 @@ public void updateMultiDeviceFlushedTime(
timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
}
+ // For recover only
+ @Override
+ public void upgradeAndUpdateMultiDeviceFlushedTime(
+ long timePartitionId, Map<IDeviceID, Long> flushedTimeMap) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new DeviceLastFlushTime());
+ // upgrade
+ if (flushTimeMapForPartition instanceof PartitionLastFlushTime) {
+ long maxFlushTime = flushTimeMapForPartition.getLastFlushTime(null);
+ ILastFlushTime newDeviceLastFlushTime = new DeviceLastFlushTime();
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ newDeviceLastFlushTime.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ maxFlushTime = Math.max(maxFlushTime, entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ } else {
+ // should not go here
+ long memIncr = 0;
+ for (Map.Entry<IDeviceID, Long> entry : flushedTimeMap.entrySet()) {
+ if (flushTimeMapForPartition.getLastFlushTime(entry.getKey()) ==
Long.MIN_VALUE) {
+ memIncr += HASHMAP_NODE_BASIC_SIZE + entry.getKey().ramBytesUsed();
+ }
+ flushTimeMapForPartition.updateLastFlushTime(entry.getKey(),
entry.getValue());
+ }
+ long finalMemIncr = memIncr;
+ memCostForEachPartition.compute(
+ timePartitionId, (k1, v1) -> v1 == null ? finalMemIncr : v1 +
finalMemIncr);
+ }
+ }
+
+ // For recover
@Override
- public void updateOneDeviceGlobalFlushedTime(IDeviceID path, long time) {
- globalLatestFlushedTimeForEachDevice.compute(
- path, (k, v) -> v == null ? time : Math.max(v, time));
+ public void updatePartitionFlushedTime(long timePartitionId, long
maxFlushedTime) {
+ ILastFlushTime flushTimeMapForPartition =
+ partitionLatestFlushedTime.computeIfAbsent(
+ timePartitionId, id -> new PartitionLastFlushTime(maxFlushedTime));
+
+ // todo
+ long memIncr = Long.BYTES;
+ flushTimeMapForPartition.updateLastFlushTime(null, maxFlushedTime);
+ memCostForEachPartition.putIfAbsent(timePartitionId, memIncr);
Review Comment:
todo what?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]