MarcosZyk commented on code in PR #8619: URL: https://github.com/apache/iotdb/pull/8619#discussion_r1058752578
########## schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemChunkGroupFlush.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse; +import org.apache.iotdb.lsm.annotation.FlushProcessor; +import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext; +import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor; +import org.apache.iotdb.lsm.sstable.bplustree.writer.BPlusTreeWriter; +import org.apache.iotdb.lsm.sstable.fileIO.FileOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** flush for MemChunkGroup */ +@FlushProcessor(level = 1) +public class MemChunkGroupFlush extends FlushLevelProcessor<MemChunkGroup, MemChunk, FlushRequest> { + + @Override + public List<MemChunk> getChildren( + MemChunkGroup memNode, FlushRequest request, FlushRequestContext context) { + return new ArrayList<>(memNode.getMemChunkMap().values()); Review Comment: Is this ```new ArrayList<>()``` not necessary? It seems there's no usage relying on the index of the returned list. Maybe just return a Collection is enough, which also saves the copy operation. ########## schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemChunkGroupFlush.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse; +import org.apache.iotdb.lsm.annotation.FlushProcessor; +import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext; +import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor; +import org.apache.iotdb.lsm.sstable.bplustree.writer.BPlusTreeWriter; +import org.apache.iotdb.lsm.sstable.fileIO.FileOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** flush for MemChunkGroup */ +@FlushProcessor(level = 1) +public class MemChunkGroupFlush extends FlushLevelProcessor<MemChunkGroup, MemChunk, FlushRequest> { + + @Override + public List<MemChunk> getChildren( + MemChunkGroup memNode, FlushRequest request, FlushRequestContext context) { + return new ArrayList<>(memNode.getMemChunkMap().values()); + } + + @Override + public void flush(MemChunkGroup memNode, FlushRequest request, FlushRequestContext context) + throws IOException { + List<MemChunk> memChunks = getChildren(memNode, null, context); + Map<MemChunk, String> memChunkGroupMapReverse = + memNode.getMemChunkMap().entrySet().stream() + .collect(HashMap::new, (m, v) -> m.put(v.getValue(), v.getKey()), HashMap::putAll); + Map<String, Long> tagValueToOffset = new HashMap<>(); + FlushResponse flushResponse = context.getResponse(); + for (MemChunk memChunk : memChunks) { + tagValueToOffset.put( + memChunkGroupMapReverse.get(memChunk), flushResponse.getChunkOffset(memChunk)); + } Review Comment: Why do we need this reverse map? The key of flushResponse may be replaced by tagValue if we pass the tageValue in context when doning MemChunkFlush. ########## schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemTableFlush.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.config.SchemaRegionConstant; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.TiFileHeader; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse; +import org.apache.iotdb.lsm.annotation.FlushProcessor; +import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext; +import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor; +import org.apache.iotdb.lsm.sstable.bplustree.writer.BPlusTreeWriter; +import org.apache.iotdb.lsm.sstable.fileIO.FileOutput; +import org.apache.iotdb.lsm.util.BloomFilter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** flush for MemTable */ +@FlushProcessor(level = 0) +public class MemTableFlush extends FlushLevelProcessor<MemTable, MemChunkGroup, FlushRequest> { + @Override + public List<MemChunkGroup> getChildren( + MemTable memNode, FlushRequest request, FlushRequestContext context) { + return new ArrayList<>(memNode.getMemChunkGroupMap().values()); + } + + @Override + public void flush(MemTable memNode, FlushRequest flushRequest, FlushRequestContext context) + throws IOException { + List<MemChunkGroup> memChunkGroups = getChildren(memNode, null, context); + Map<MemChunkGroup, String> memChunkGroupMapReverse = + memNode.getMemChunkGroupMap().entrySet().stream() + .collect(HashMap::new, (m, v) -> m.put(v.getValue(), v.getKey()), HashMap::putAll); + Map<String, Long> tagKeyToOffset = new HashMap<>(); + FlushResponse flushResponse = context.getResponse(); + for (MemChunkGroup memChunkGroup : memChunkGroups) { + tagKeyToOffset.put( + memChunkGroupMapReverse.get(memChunkGroup), flushResponse.getTagKeyOffset(memChunkGroup)); + } Review Comment: The reverse operation may be optimized. ########## schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/tagIndex/flush/MemTableFlush.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.flush; + +import org.apache.iotdb.db.metadata.tagSchemaRegion.config.SchemaRegionConstant; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.file.entry.TiFileHeader; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunk; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemChunkGroup; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.memtable.MemTable; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.request.FlushRequest; +import org.apache.iotdb.db.metadata.tagSchemaRegion.tagIndex.response.FlushResponse; +import org.apache.iotdb.lsm.annotation.FlushProcessor; +import org.apache.iotdb.lsm.context.requestcontext.FlushRequestContext; +import org.apache.iotdb.lsm.levelProcess.FlushLevelProcessor; +import org.apache.iotdb.lsm.sstable.bplustree.writer.BPlusTreeWriter; +import org.apache.iotdb.lsm.sstable.fileIO.FileOutput; +import org.apache.iotdb.lsm.util.BloomFilter; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** flush for MemTable */ +@FlushProcessor(level = 0) +public class MemTableFlush extends FlushLevelProcessor<MemTable, MemChunkGroup, FlushRequest> { + @Override + public List<MemChunkGroup> getChildren( + MemTable memNode, FlushRequest request, FlushRequestContext context) { + return new ArrayList<>(memNode.getMemChunkGroupMap().values()); + } + + @Override + public void flush(MemTable memNode, FlushRequest flushRequest, FlushRequestContext context) + throws IOException { + List<MemChunkGroup> memChunkGroups = getChildren(memNode, null, context); + Map<MemChunkGroup, String> memChunkGroupMapReverse = + memNode.getMemChunkGroupMap().entrySet().stream() + .collect(HashMap::new, (m, v) -> m.put(v.getValue(), v.getKey()), HashMap::putAll); + Map<String, Long> tagKeyToOffset = new HashMap<>(); + FlushResponse flushResponse = context.getResponse(); + for (MemChunkGroup memChunkGroup : memChunkGroups) { + tagKeyToOffset.put( + memChunkGroupMapReverse.get(memChunkGroup), flushResponse.getTagKeyOffset(memChunkGroup)); + } + FileOutput fileOutput = context.getFileOutput(); + BPlusTreeWriter bPlusTreeWriter = new BPlusTreeWriter(fileOutput); + TiFileHeader tiFileHeader = new TiFileHeader(); + tiFileHeader.setTagKeyIndexOffset(bPlusTreeWriter.write(tagKeyToOffset, false)); + List<String> tagKeyAndValues = getTagKeyAndValues(memNode); + BloomFilter bloomFilter = BloomFilter.getEmptyBloomFilter(0.05, 3); + for (String key : tagKeyAndValues) { + bloomFilter.add(key); + } Review Comment: Maybe we can pass one key and a list of value into BloomFilter each time for construction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
