jt2594838 commented on a change in pull request #1865:
URL: https://github.com/apache/iotdb/pull/1865#discussion_r514949721
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTask.java
##########
@@ -191,12 +191,11 @@ private AppendEntriesRequest
prepareRequest(List<ByteBuffer> logList, int startP
logger.error("getTerm failed for newly append entries", e);
}
}
+ logger.debug("{}, node={} catchup request={}", raftMember.getName(), node,
request.toString());
Review comment:
Replace `request.toString()` with simply `request`.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -160,31 +216,28 @@ public LogManagerMeta getMeta() {
* Recover all the logs in disk. This function will be called once this
instance is created.
*/
@Override
- public List<Log> getAllEntries() {
- List<Log> logs = recoverLog();
- int size = logs.size();
- if (size != 0 && meta.getLastLogIndex() <= logs.get(size -
1).getCurrLogIndex()) {
- meta.setLastLogTerm(logs.get(size - 1).getCurrLogTerm());
- meta.setLastLogIndex(logs.get(size - 1).getCurrLogIndex());
- meta.setCommitLogTerm(logs.get(size - 1).getCurrLogTerm());
- meta.setCommitLogIndex(logs.get(size - 1).getCurrLogIndex());
+ public List<Log> getAllEntriesBeforeAppliedIndex() {
+ logger.debug("getAllEntriesBeforeAppliedIndex,
maxHaveAppliedCommitIndex={}, commitLogIndex={}",
+ meta.getMaxHaveAppliedCommitIndex(), meta.getCommitLogIndex());
+ if (meta.getMaxHaveAppliedCommitIndex() >= meta.getCommitLogIndex()) {
+ return Collections.emptyList();
}
- return logs;
+ return getLogs(meta.getMaxHaveAppliedCommitIndex(),
meta.getCommitLogIndex());
Review comment:
It seems more like `getAllEntriesAfterAppliedIndex` instead of
`getAllEntriesBeforeAppliedIndex`.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -203,47 +256,113 @@ public void append(List<Log> entries) throws IOException
{
*/
private void putLogs(List<Log> entries) {
for (Log log : entries) {
- logBuffer.mark();
+ logDataBuffer.mark();
+ logIndexBuffer.mark();
ByteBuffer logData = log.serialize();
int size = logData.capacity() + Integer.BYTES;
try {
- logBuffer.putInt(logData.capacity());
- logBuffer.put(logData);
- logSizeDeque.addLast(size);
- bufferedLogNum++;
+ logDataBuffer.putInt(logData.capacity());
+ logDataBuffer.put(logData);
+ logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+ logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+ offsetOfTheCurrentLogDataOutputStream += size;
} catch (BufferOverflowException e) {
logger.info("Raft log buffer overflow!");
- logBuffer.reset();
+ logDataBuffer.reset();
+ logIndexBuffer.reset();
flushLogBuffer();
- logBuffer.putInt(logData.capacity());
- logBuffer.put(logData);
- logSizeDeque.addLast(size);
- bufferedLogNum++;
+ checkCloseCurrentFile(log.getCurrLogIndex() - 1);
+ logDataBuffer.putInt(logData.capacity());
+ logDataBuffer.put(logData);
+ logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+ logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
+ offsetOfTheCurrentLogDataOutputStream += size;
+ }
+ }
+ }
+
+ private void checkCloseCurrentFile(long commitIndex) {
+ if (offsetOfTheCurrentLogDataOutputStream >
maxRaftLogPersistDataSizePerFile) {
+ try {
+ closeCurrentFile(commitIndex);
+ serializeMeta(meta);
+ createNewLogFile(logDir, commitIndex + 1);
+ } catch (IOException e) {
+ logger.error("check close current file failed", e);
}
}
}
+ private void closeCurrentFile(long commitIndex) throws IOException {
+ lock.writeLock().lock();
+ try {
+ if (currentLogDataOutputStream != null) {
+ currentLogDataOutputStream.close();
+ currentLogDataOutputStream = null;
+ }
+
+ if (currentLogIndexOutputStream != null) {
+ currentLogIndexOutputStream.close();
+ currentLogIndexOutputStream = null;
+ }
+ File currentLogDataFile = getCurrentLogDataFile();
+ String newDataFileName = currentLogDataFile.getName()
+ .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(commitIndex));
+ File newCurrentLogDatFile = SystemFileFactory.INSTANCE
+ .getFile(currentLogDataFile.getParent() + File.separator +
newDataFileName);
+ if (!currentLogDataFile.renameTo(newCurrentLogDatFile)) {
+ logger.error("rename log data file={} failed",
currentLogDataFile.getAbsoluteFile());
+ }
+ logDataFileList.remove(logDataFileList.size() - 1);
+ logDataFileList.add(newCurrentLogDatFile);
Review comment:
Maybe `list.set()` is enough for this.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -50,82 +53,140 @@
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SyncLogDequeSerializer implements StableEntryManager {
private static final Logger logger =
LoggerFactory.getLogger(SyncLogDequeSerializer.class);
- private static final String LOG_FILE_PREFIX = ".data";
+ private static final String LOG_DATA_FILE_SUFFIX = "data";
+ private static final String LOG_INDEX_FILE_SUFFIX = "idx";
+
+ /**
+ * the log data files
+ */
+ private List<File> logDataFileList;
+
+ /**
+ * the log index files
+ */
+ private List<File> logIndexFileList;
- List<File> logFileList;
private LogParser parser = LogParser.getINSTANCE();
private File metaFile;
- private FileOutputStream currentLogOutputStream;
- private Deque<Integer> logSizeDeque = new ArrayDeque<>();
+ private FileOutputStream currentLogDataOutputStream;
+ private FileOutputStream currentLogIndexOutputStream;
private LogManagerMeta meta;
private HardState state;
- // mark first log position
- private long firstLogPosition = 0;
- // removed log size
- private long removedLogSize = 0;
- // when the removedLogSize larger than this, we actually delete logs
- private long maxRemovedLogSize = ClusterDescriptor.getInstance().getConfig()
- .getMaxUnsnapshotLogSize();
- // min version of available log
+
+ /**
+ * min version of available log
+ */
private long minAvailableVersion = 0;
- // max version of available log
+
+ /**
+ * max version of available log
+ */
private long maxAvailableVersion = Long.MAX_VALUE;
- // log dir
+
private String logDir;
- // version controller
+
private VersionController versionController;
- private ByteBuffer logBuffer = ByteBuffer
+ private ByteBuffer logDataBuffer = ByteBuffer
.allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+ private ByteBuffer logIndexBuffer = ByteBuffer
+
.allocate(ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize());
+
+ private long offsetOfTheCurrentLogDataOutputStream = 0;
+
+ /**
+ * file name pattern:
+ * <p>
+ * for log data file: ${startTime}-${Long.MAX_VALUE}-{version}-data
+ * <p>
+ * for log index file: ${startTime}-${Long.MAX_VALUE}-{version}-idx
Review comment:
Is it really `startTime`? I think it should more clear here.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -349,9 +495,136 @@ private void checkLogFile(File file) {
} catch (IOException e) {
logger.warn("Cannot delete outdated log file {}", file);
}
+ return false;
+ }
+
+ String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+ // start index should be smaller than end index
+ if (Long.parseLong(splits[0]) > Long.parseLong(splits[1])) {
+ try {
+ Files.delete(file.toPath());
+ } catch (IOException e) {
+ logger.warn("Cannot delete incorrect log file {}", file);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ private void recoverTheLastLogFile() {
+ if (logIndexFileList.isEmpty()) {
+ logger.info("no log index file to recover");
+ return;
+ }
+
+ File lastIndexFile = logIndexFileList.get(logIndexFileList.size() - 1);
+ long endIndex =
Long.parseLong(lastIndexFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+ boolean success = true;
+ if (endIndex != Long.MAX_VALUE) {
+ logger.info("last log index file={} no need to recover",
lastIndexFile.getAbsoluteFile());
+ } else {
+ success = recoverTheLastLogIndexFile(lastIndexFile);
+ }
+
+ if (!success) {
+ logger.error("recover log index file failed, clear all logs in disk, {}",
+ lastIndexFile.getAbsoluteFile());
+ for (int i = 0; i < logIndexFileList.size(); i++) {
+ deleteLogDataAndIndexFile(i);
+ }
+ clearFirstLogIndex();
+
+ return;
+ }
+
+ File lastDataFile = logDataFileList.get(logDataFileList.size() - 1);
+ endIndex =
Long.parseLong(lastDataFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+ if (endIndex != Long.MAX_VALUE) {
+ logger.info("last log data file={} no need to recover",
lastDataFile.getAbsoluteFile());
+ return;
+ }
+
+ success =
recoverTheLastLogDataFile(logDataFileList.get(logDataFileList.size() - 1));
+ if (!success) {
+ logger.error("recover log data file failed, clear all logs in disk,{}",
+ lastDataFile.getAbsoluteFile());
+ for (int i = 0; i < logIndexFileList.size(); i++) {
+ deleteLogDataAndIndexFile(i);
+ }
+ clearFirstLogIndex();
+ }
+ }
+
+ private boolean recoverTheLastLogDataFile(File file) {
+ String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+ long startIndex = Long.parseLong(splits[0]);
+ Pair<File, Pair<Long, Long>> fileStartAndEndIndex =
getLogIndexFile(startIndex);
+ if (fileStartAndEndIndex.right.left == startIndex) {
+ long endIndex = fileStartAndEndIndex.right.right;
+ String newDataFileName = file.getName()
+ .replaceAll(String.valueOf(Long.MAX_VALUE),
String.valueOf(endIndex));
+ File newLogDataFile = SystemFileFactory.INSTANCE
+ .getFile(file.getParent() + File.separator + newDataFileName);
+ if (!file.renameTo(newLogDataFile)) {
+ logger.error("rename log data file={} failed when recover",
file.getAbsoluteFile());
+ }
+ logDataFileList.remove(logDataFileList.size() - 1);
+ logDataFileList.add(newLogDataFile);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean recoverTheLastLogIndexFile(File file) {
+ logger.debug("start to recover the last log index file={}",
file.getAbsoluteFile());
+ String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+ long startIndex = Long.parseLong(splits[0]);
+ int longLength = 8;
+ byte[] bytes = new byte[longLength];
+
+ int totalCount = 0;
+ long offset = 0;
+ try (FileInputStream inputStream = new FileInputStream(file)) {
Review comment:
Better to use buffered stream.
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -593,75 +815,385 @@ public void close() {
}
/**
- * adjust maxRemovedLogSize to the first log file
+ * get file version from file The file name structure is as follows:
+ * {startLogIndex}-{endLogIndex}-{version}-data)
+ *
+ * @param file file
+ * @return version from file
+ */
+ private long getFileVersion(File file) {
+ return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
+ }
+
+ public void checkDeletePersistRaftLog() {
+ // 1. check the log index offset list size
+ try {
+ lock.writeLock().lock();
+ if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
+ int compactIndex = logIndexOffsetList.size() -
maxRaftLogIndexSizeInMemory;
+ logIndexOffsetList.subList(0, compactIndex).clear();
+ firstLogIndex += compactIndex;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // 2. check the persist log file number
+ while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
+ deleteLogDataAndIndexFile(0);
+ }
+
+ // 3. check the persist log index number
+ while (!logDataFileList.isEmpty()) {
+ File firstFile = logDataFileList.get(0);
+ String[] splits = firstFile.getName().split(FILE_NAME_SEPARATOR);
+ if (meta.getCommitLogIndex() - Long.parseLong(splits[1]) >
maxPersistRaftLogNumberOnDisk) {
+ deleteLogDataAndIndexFile(0);
+ } else {
+ return;
+ }
+ }
Review comment:
If `maxPersistRaftLogNumberOnDisk` is too small, is it possible that the
current file will be deleted here?
##########
File path:
cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
##########
@@ -593,75 +815,385 @@ public void close() {
}
/**
- * adjust maxRemovedLogSize to the first log file
+ * get file version from file The file name structure is as follows:
+ * {startLogIndex}-{endLogIndex}-{version}-data)
+ *
+ * @param file file
+ * @return version from file
+ */
+ private long getFileVersion(File file) {
+ return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
+ }
+
+ public void checkDeletePersistRaftLog() {
+ // 1. check the log index offset list size
+ try {
+ lock.writeLock().lock();
+ if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
+ int compactIndex = logIndexOffsetList.size() -
maxRaftLogIndexSizeInMemory;
+ logIndexOffsetList.subList(0, compactIndex).clear();
+ firstLogIndex += compactIndex;
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ // 2. check the persist log file number
+ while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
+ deleteLogDataAndIndexFile(0);
+ }
+
+ // 3. check the persist log index number
+ while (!logDataFileList.isEmpty()) {
+ File firstFile = logDataFileList.get(0);
+ String[] splits = firstFile.getName().split(FILE_NAME_SEPARATOR);
+ if (meta.getCommitLogIndex() - Long.parseLong(splits[1]) >
maxPersistRaftLogNumberOnDisk) {
+ deleteLogDataAndIndexFile(0);
+ } else {
+ return;
+ }
+ }
+ }
+
+ private void deleteLogDataAndIndexFile(int index) {
+ File logDataFile = null;
+ File logIndexFile = null;
+ try {
+ lock.writeLock().lock();
+ logDataFile = logDataFileList.get(index);
+ logIndexFile = logIndexFileList.get(index);
+ Files.delete(logDataFile.toPath());
+ Files.delete(logIndexFile.toPath());
+ logDataFileList.remove(index);
+ logIndexFileList.remove(index);
+ logger.debug("delete date file={}, index file={}",
logDataFile.getAbsoluteFile(),
+ logIndexFile.getAbsoluteFile());
+ } catch (IOException e) {
+ logger.error("delete file failed, index={}, data file={}, index
file={}", index,
+ logDataFile == null ? null : logDataFile.getAbsoluteFile(),
+ logIndexFile == null ? null : logIndexFile.getAbsoluteFile());
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * The file name structure is as follows:
{startLogIndex}-{endLogIndex}-{version}-data)
+ *
+ * @param file1 File to compare
+ * @param file2 File to compare
*/
- private void adjustNextThreshold() {
- if (!logFileList.isEmpty()) {
- maxRemovedLogSize = logFileList.get(0).length();
+ private int comparePersistLogFileName(File file1, File file2) {
+ String[] items1 = file1.getName().split(FILE_NAME_SEPARATOR);
+ String[] items2 = file2.getName().split(FILE_NAME_SEPARATOR);
+ if (items1.length != FILE_NAME_PART_LENGTH || items2.length !=
FILE_NAME_PART_LENGTH) {
+ logger.error(
+ "file1={}, file2={} name should be in the following format:
startLogIndex-endLogIndex-version-data",
+ file1.getAbsoluteFile(), file2.getAbsoluteFile());
+ }
+ long startLogIndex1 = Long.parseLong(items1[0]);
+ long startLogIndex2 = Long.parseLong(items2[0]);
+ int res = Long.compare(startLogIndex1, startLogIndex2);
+ if (res == 0) {
+ return Long.compare(Long.parseLong(items1[1]),
Long.parseLong(items2[1]));
}
+ return res;
}
/**
- * actually delete the data file which only contains removed data
+ * @param startIndex the log start index
+ * @param endIndex the log end index
+ * @return the raft log which index between [startIndex, endIndex] or empty
if not found
*/
- private void actuallyDeleteFile() {
- Iterator<File> logFileIterator = logFileList.iterator();
- while (logFileIterator.hasNext()) {
- File logFile = logFileIterator.next();
- if (logger.isDebugEnabled()) {
- logger.debug("Examining file for removal, file: {}, len: {},
removedLogSize: {}", logFile
- , logFile.length(), removedLogSize);
- }
- if (logFile.length() > removedLogSize) {
- break;
- }
-
- logger.info("Removing a log file {}, len: {}, removedLogSize: {}",
logFile,
- logFile.length(), removedLogSize);
- removedLogSize -= logFile.length();
- // if system down before delete, we can use this to delete file during
recovery
- minAvailableVersion = getFileVersion(logFile);
- serializeMeta(meta);
+ @Override
+ public List<Log> getLogs(long startIndex, long endIndex) {
Review comment:
I think we should enforce a limit on this method to avoid out ot memory
when the range is too long.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]