fanhualta commented on a change in pull request #32: fix sonar issues
URL: https://github.com/apache/incubator-iotdb/pull/32#discussion_r251860194
##########
File path:
iotdb/src/main/java/org/apache/iotdb/db/engine/filenode/FileNodeManager.java
##########
@@ -709,180 +693,182 @@ public boolean appendFileToFileNode(String
fileNodeName, IntervalFileNode append
* @throws FileNodeManagerException FileNodeManagerException
*/
public void mergeAll() throws FileNodeManagerException {
- if (fileNodeManagerStatus == FileNodeManagerStatus.NONE) {
- fileNodeManagerStatus = FileNodeManagerStatus.MERGE;
- LOGGER.info("Start to merge all overflowed filenode");
- List<String> allFileNodeNames;
+ if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
+ LOGGER.warn("Failed to merge all overflowed filenode, because filenode
manager status is {}",
+ fileNodeManagerStatus);
+ return;
+ }
+
+ fileNodeManagerStatus = FileNodeManagerStatus.MERGE;
+ LOGGER.info("Start to merge all overflowed filenode");
+ List<String> allFileNodeNames;
+ try {
+ allFileNodeNames = MManager.getInstance().getAllFileNames();
+ } catch (PathErrorException e) {
+ LOGGER.error("Get all storage group seriesPath error,", e);
+ throw new FileNodeManagerException(e);
+ }
+ List<Future<?>> futureTasks = new ArrayList<>();
+ for (String fileNodeName : allFileNodeNames) {
+ FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
try {
- allFileNodeNames = MManager.getInstance().getAllFileNames();
- } catch (PathErrorException e) {
- LOGGER.error("Get all storage group seriesPath error,", e);
- e.printStackTrace();
- throw new FileNodeManagerException(e);
+ Future<?> task = fileNodeProcessor.submitToMerge();
+ if (task != null) {
+ LOGGER.info("Submit the filenode {} to the merge pool",
fileNodeName);
+ futureTasks.add(task);
+ }
+ } finally {
+ fileNodeProcessor.writeUnlock();
}
- List<Future<?>> futureTasks = new ArrayList<>();
- for (String fileNodeName : allFileNodeNames) {
- FileNodeProcessor fileNodeProcessor = getProcessor(fileNodeName, true);
+ }
+ long totalTime = 0;
+ // loop waiting for merge to end, the longest waiting time is
+ // 60s.
+ int time = 2;
+ for (Future<?> task : futureTasks) {
+ while (!task.isDone()) {
try {
- Future<?> task = fileNodeProcessor.submitToMerge();
- if (task != null) {
- LOGGER.info("Submit the filenode {} to the merge pool",
fileNodeName);
- futureTasks.add(task);
- }
- } finally {
- fileNodeProcessor.writeUnlock();
+ LOGGER.info(
+ "Waiting for the end of merge, already waiting for {}s, "
+ + "continue to wait anothor {}s",
+ totalTime, time);
+ TimeUnit.SECONDS.sleep(time);
+ totalTime += time;
+ time = updateWaitTime(time);
+ } catch (InterruptedException e) {
+ LOGGER.error("Unexpected interruption {}", e);
+ Thread.currentThread().interrupt();
}
}
- long totalTime = 0;
- // loop waiting for merge to end, the longest waiting time is
- // 60s.
- int time = 2;
- for (Future<?> task : futureTasks) {
- while (!task.isDone()) {
- try {
- LOGGER.info(
- "Waiting for the end of merge, already waiting for {}s, "
- + "continue to wait anothor {}s",
- totalTime, time);
- TimeUnit.SECONDS.sleep(time);
- totalTime += time;
- if (time < 32) {
- time = time * 2;
- } else {
- time = 60;
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ }
+ fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+ LOGGER.info("End to merge all overflowed filenode");
+ }
+
+ private int updateWaitTime(int time) {
+ return time < 32 ? time * 2 : 60;
+ }
+
+ /**
+ * try to close the filenode processor. The name of filenode processor is
processorName
+ */
+ private boolean closeOneProcessor(String processorName) throws
FileNodeManagerException {
+ if (!processorMap.containsKey(processorName)) {
+ return true;
+ }
+
+ Processor processor = processorMap.get(processorName);
+ if (processor.tryWriteLock()) {
+ try {
+ if (processor.canBeClosed()) {
+ processor.close();
+ return true;
+ } else {
+ return false;
}
+ } catch (ProcessorException e) {
+ LOGGER.error("Close the filenode processor {} error.", processorName,
e);
+ throw new FileNodeManagerException(e);
+ } finally {
+ processor.writeUnlock();
}
- fileNodeManagerStatus = FileNodeManagerStatus.NONE;
- LOGGER.info("End to merge all overflowed filenode");
} else {
- LOGGER.warn("Failed to merge all overflowed filenode, because filenode
manager status is {}",
- fileNodeManagerStatus);
+ return false;
}
}
/**
- * try to close the filenode processor. The name of filenode processor is
processorName
+ * delete one filenode.
*/
- private boolean closeOneProcessor(String processorName) throws
FileNodeManagerException {
- if (processorMap.containsKey(processorName)) {
- Processor processor = processorMap.get(processorName);
+ public void deleteOneFileNode(String processorName) throws
FileNodeManagerException {
+ if (fileNodeManagerStatus != FileNodeManagerStatus.NONE) {
+ return;
+ }
+
+ fileNodeManagerStatus = FileNodeManagerStatus.CLOSE;
+ try {
+ if (processorMap.containsKey(processorName)) {
+ deleteFileNodeBlocked(processorName);
+ }
+ String fileNodePath = TsFileDBConf.fileNodeDir;
+ fileNodePath = standardizeDir(fileNodePath) + processorName;
+ FileUtils.deleteDirectory(new File(fileNodePath));
+
+ cleanBufferWrite(processorName);
+ cleanBufferWrite(processorName);
+
+ MultiFileLogNodeManager.getInstance()
+ .deleteNode(processorName +
IoTDBConstant.BUFFERWRITE_LOG_NODE_SUFFIX);
+ MultiFileLogNodeManager.getInstance()
+ .deleteNode(processorName +
IoTDBConstant.OVERFLOW_LOG_NODE_SUFFIX);
+ } catch (IOException e) {
+ LOGGER.error("Delete the filenode processor {} error.", processorName,
e);
+ throw new FileNodeManagerException(e);
+ } finally {
+ fileNodeManagerStatus = FileNodeManagerStatus.NONE;
+ }
+ }
+
+ private void cleanBufferWrite(String processorName) throws IOException {
+ List<String> bufferwritePathList = directories.getAllTsFileFolders();
+ for (String bufferwritePath : bufferwritePathList) {
+ bufferwritePath = standardizeDir(bufferwritePath) + processorName;
+ File bufferDir = new File(bufferwritePath);
+ // free and close the streams under this bufferwrite directory
+ if (!bufferDir.exists())
+ continue;
+ File[] bufferFiles = bufferDir.listFiles();
+ if (bufferFiles != null) {
+ for (File bufferFile : bufferFiles) {
+
FileReaderManager.getInstance().closeFileAndRemoveReader(bufferFile.getPath());
+ }
+ }
+ FileUtils.deleteDirectory(new File(bufferwritePath));
+ }
+ }
+
+ private void deleteFileNodeBlocked(String processorName) throws
FileNodeManagerException {
+ LOGGER.info("Forced to delete the filenode processor {}", processorName);
+ FileNodeProcessor processor = processorMap.get(processorName);
+ while (true) {
if (processor.tryWriteLock()) {
try {
if (processor.canBeClosed()) {
- processor.close();
- return true;
+ LOGGER.info("Delete the filenode processor {}.", processorName);
+ processor.delete();
+ processorMap.remove(processorName);
+ break;
} else {
- return false;
+ LOGGER.info(
+ "Can't delete the filenode processor {}, "
+ + "because the filenode processor can't be closed."
+ + " Wait 100ms to retry");
}
} catch (ProcessorException e) {
- LOGGER.error("Close the filenode processor {} error.",
processorName, e);
+ LOGGER.error("Delete the filenode processor {} error.",
processorName, e);
throw new FileNodeManagerException(e);
} finally {
processor.writeUnlock();
}
} else {
- return false;
+ LOGGER.info(
+ "Can't delete the filenode processor {}, because it can't get
the write lock."
+ + " Wait 100ms to retry");
}
Review comment:
Add the processor name, {... , processorName}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services