MarcosZyk commented on code in PR #8619: URL: https://github.com/apache/iotdb/pull/8619#discussion_r1063937040
########## schema-engine-tag/src/main/java/org/apache/iotdb/lsm/manager/FlushManager.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.lsm.manager; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.db.metadata.tagSchemaRegion.config.SchemaRegionConstant; +import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext; +import org.apache.iotdb.lsm.request.IFlushRequest; +import org.apache.iotdb.lsm.sstable.fileIO.TiFileOutputStream; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class FlushManager<T, R extends IFlushRequest> + extends BasicLSMManager<T, R, FlushRequestContext> { + + // use wal manager object to write wal file on deletion + private WALManager walManager; + + private IMemManager memManager; + + private ScheduledExecutorService checkFlushThread; + + private String flushDirPath; + + private String flushFilePrefix; + + private final int flushIntervalMs = 1000; + + public FlushManager( + WALManager walManager, T memManager, String flushDirPath, String flushFilePrefix) { + this.walManager = walManager; + if (!(memManager instanceof IMemManager)) { + throw new ClassCastException("memManager must implement IMemManager"); + } + this.memManager = (IMemManager) memManager; + this.flushDirPath = flushDirPath; + File flushDir = new File(flushDirPath); + flushDir.mkdirs(); + this.flushFilePrefix = flushFilePrefix; + checkFlushThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("LSM-Flush-Service"); + ScheduledExecutorUtil.safelyScheduleAtFixedRate( + checkFlushThread, + this::checkFlush, + flushIntervalMs, + flushIntervalMs, + TimeUnit.MILLISECONDS); + } + + public void checkFlush() { + synchronized (memManager) { + if (memManager.isNeedFlush()) { + List<R> flushRequests = memManager.getFlushRequests(); + for (R flushRequest : flushRequests) { + flushRequest.setFlushDirPath(flushDirPath); + flushRequest.setFlushFileName(flushFilePrefix + "-0-" + flushRequest.getIndex()); + flushRequest.setFlushDeleteFileName( + flushFilePrefix + "-delete" + "-0-" + flushRequest.getIndex()); + flush(flushRequest); + } + } + } + } + + private void updateWal(R request) { + int index = request.getIndex(); + walManager.deleteWalFile(index); + } + + private void renameFlushFile(T root, R request, FlushRequestContext context) { + TiFileOutputStream fileOutput = context.getFileOutput(); + try { + fileOutput.close(); + String flushFileName = request.getFlushFileName() + SchemaRegionConstant.TMP; + File flushFile = new File(this.flushDirPath, flushFileName); + File newFlushFile = new File(this.flushDirPath, request.getFlushFileName()); + flushFile.renameTo(newFlushFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void flush(R flushRequest) { + FlushRequestContext flushRequestBaseContext = new FlushRequestContext(); + process((T) flushRequest.getMemNode(), flushRequest, flushRequestBaseContext); + } + + @Override + public void preProcess(T root, R request, FlushRequestContext context) { + String flushFileName = request.getFlushFileName() + SchemaRegionConstant.TMP; + File flushFile = new File(this.flushDirPath, flushFileName); + try { + if (!flushFile.exists()) { + flushFile.createNewFile(); + } + context.setFileOutput(new TiFileOutputStream(flushFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void postProcess(T root, R request, FlushRequestContext context) { + memManager.removeMemData(request); Review Comment: If there's exception occurred during deleting WAL and renaming tmp file, we will lose the memory data and disk data, thus we should delete memory data in last step. ########## schema-engine-tag/src/main/java/org/apache/iotdb/lsm/manager/WALManager.java: ########## @@ -39,33 +44,145 @@ public abstract class WALManager { // directly use the wal reader that comes with the lsm framework private WALReader walReader; + private String[] walFileNames; + + private int currentFileIndex = 0; + + protected int currentFileID = 0; + + protected String walFilePrefix; + private boolean recover; - public WALManager(String schemaDirPath) { - this.walDirPath = schemaDirPath; + private String flushDirPath; + + private String flushFilePrefix; + + public WALManager(String walDirPath) { + this.walDirPath = walDirPath; } public WALManager( String walDirPath, - String walFileName, - int walBufferSize, - IWALRecord walRecord, - boolean forceEachWrite) + String walFilePrefix, + String flushDirPath, + String flushFilePrefix, + IWALRecord walRecord) throws IOException { this.walDirPath = walDirPath; - initFile(walDirPath, walFileName); - walWriter = new WALWriter(walFile, walBufferSize, forceEachWrite); - walReader = new WALReader(walFile, walRecord); + this.walFilePrefix = walFilePrefix; + this.flushDirPath = flushDirPath; + this.flushFilePrefix = flushFilePrefix; + initRecover(walRecord); recover = false; } - private void initFile(String walDirPath, String walFileName) throws IOException { - File schemaDir = new File(walDirPath); - schemaDir.mkdirs(); + public void initRecover(IWALRecord walRecord) throws IOException { + checkPoint(); + File walDir = new File(walDirPath); + walDir.mkdirs(); + File[] walFiles = walDir.listFiles(); + walFileNames = + Arrays.stream(walFiles) + .map(this::getWalFileID) + .sorted() + .map(this::getWalFileName) + .toArray(String[]::new); + String walFileName; + if (walFileNames.length == 0) { + walFileName = getWalFileName(currentFileID); + } else { + walFileName = walFileNames[currentFileIndex]; + } + initWalWriterAndReader(initFile(walFileName), walRecord); + } + + private void checkPoint() { + File flushDir = new File(flushDirPath); + flushDir.mkdirs(); + File[] flushFiles = flushDir.listFiles(); + String[] flushTmpFileNames = + Arrays.stream(flushFiles) + .map(File::getName) + .filter(name -> name.endsWith(SchemaRegionConstant.TMP)) + .toArray(String[]::new); + Integer[] flushTmpIDs = + Arrays.stream(flushTmpFileNames).map(this::getFlushFileID).toArray(Integer[]::new); + File walDir = new File(walDirPath); + walDir.mkdirs(); + File[] walFiles = walDir.listFiles(); + Set<Integer> walFileIDs = + Arrays.stream(walFiles).map(this::getWalFileID).collect(Collectors.toSet()); + for (int i = 0; i < flushTmpFileNames.length; i++) { + // have tifile tmp don't have wal, tifile is complete, need rename tifile. + File flushTmpFile = new File(flushDirPath + File.separator + flushTmpFileNames[i]); + String flushFileName = + flushTmpFileNames[i].substring( + 0, flushTmpFileNames[i].length() - SchemaRegionConstant.TMP.length()); + if (!walFileIDs.contains(flushTmpIDs[i])) { + File flushFile = new File(flushDirPath + File.separator + flushFileName); + flushTmpFile.renameTo(flushFile); + } + // have tifile tmp and have wal, tifile is incomplete,delete tifile tmp and delete file. + else { + flushTmpFile.delete(); + String flushDeleteFileName = + ConvertUtils.getFlushDeleteFileNameFromFlushFileName(flushFileName); + File flushDeleteFile = new File(flushDirPath + File.separator + flushDeleteFileName); + if (flushDeleteFile.exists()) { + flushDeleteFile.delete(); + } + } + } + } Review Comment: Maybe move this to RecoverManager is better. This method processes files more than WAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
