jt2594838 commented on a change in pull request #1474:
URL: https://github.com/apache/incubator-iotdb/pull/1474#discussion_r453404586
##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -252,18 +267,24 @@ public StorageGroupProcessor getProcessor(String path)
throws StorageEngineExcep
StorageGroupProcessor processor;
processor = processorMap.get(storageGroupName);
if (processor == null) {
- storageGroupName = storageGroupName.intern();
- synchronized (storageGroupName) {
- processor = processorMap.get(storageGroupName);
- if (processor == null) {
- logger.info("construct a processor instance, the storage group is
{}, Thread is {}",
+ // if finish recover
+ if (isAllSgReady.get()) {
+ storageGroupName = storageGroupName.intern();
+ synchronized (storageGroupName) {
+ processor = processorMap.get(storageGroupName);
+ if (processor == null) {
+ logger.info("construct a processor instance, the storage group
is {}, Thread is {}",
storageGroupName, Thread.currentThread().getId());
- processor = new StorageGroupProcessor(systemDir, storageGroupName,
fileFlushPolicy);
- StorageGroupMNode storageGroup = IoTDB.metaManager
+ processor = new StorageGroupProcessor(systemDir,
storageGroupName, fileFlushPolicy);
+ StorageGroupMNode storageGroup = IoTDB.metaManager
.getStorageGroupNode(storageGroupName);
- processor.setDataTTL(storageGroup.getDataTTL());
- processorMap.put(storageGroupName, processor);
+ processor.setDataTTL(storageGroup.getDataTTL());
+ processorMap.put(storageGroupName, processor);
+ }
}
+ } else {
+ // not finished recover, refuse the request
+ throw new StorageEngineException("the sg " + storageGroupName + "
may not ready now, please wait and retry later");
Review comment:
I think this is not very friendly. Some users may want to send requests
once the server is up, without noticing whether the recovery has finished, and
they will feel upset when they receive failures.
It may help to add some wait and timeout, so there is less chance that the
user will miss first operations.
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -792,8 +792,9 @@ private void
tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestF
continue;
}
// Update cached last value with high priority
- ((MeasurementMNode) manager.getChild(node, measurementList[i]))
- .updateCachedLast(plan.composeLastTimeValuePair(i), true,
latestFlushedTime);
+ Path tmpPath = new Path(plan.getDeviceId(), measurementList[i]);
+ manager.updateLastCache(tmpPath.getFullPath(),
+ plan.composeLastTimeValuePair(i), true, latestFlushedTime);
Review comment:
Using the device node will help reduce the search path (otherwise it
will search from the root node every time), so I would suggest you add the
device node as a nullable parameter of `MManager.updateLastCache` and use it as
much as possible.
##########
File path: server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
##########
@@ -123,7 +120,22 @@ private void setUp() throws StartupException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableMQTTService()) {
registerManager.register(MQTTService.getInstance());
}
- logger.info("IoTDB is set up.");
+
+ logger.info("IoTDB is set up, now may some sgs are not ready, don't worry,
the superman Dr. Huang will teach the sg to fix themselves.");
+
+ while (!StorageEngine.getInstance().isAllSgReady()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
Review comment:
How about just waiting where you created that single thread pool and
shuting down it?
##########
File path: server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
##########
@@ -135,6 +140,15 @@ private StorageEngine() {
// recover upgrade process
UpgradeUtils.recoverUpgrade();
+ recover();
+ }
+
+ public void recover() {
+ ExecutorService executors = Executors.newSingleThreadExecutor();
+ executors.submit(this::recoverAllSgs);
Review comment:
When is this pool shut down? And what is the point that you do not wait
until the recovery finishes?
##########
File path:
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
##########
@@ -792,8 +792,9 @@ private void
tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestF
continue;
}
// Update cached last value with high priority
- ((MeasurementMNode) manager.getChild(node, measurementList[i]))
- .updateCachedLast(plan.composeLastTimeValuePair(i), true,
latestFlushedTime);
+ Path tmpPath = new Path(plan.getDeviceId(), measurementList[i]);
+ manager.updateLastCache(tmpPath.getFullPath(),
+ plan.composeLastTimeValuePair(i), true, latestFlushedTime);
Review comment:
And it is a little wasteful to join deviceId and measurementId using a
Path. You may just use string concatenation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]