This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a074cde mark and sweep indices for V3 segment format (#7301) a074cde is described below commit a074cde132d69e0ad03d64f6c07b74f822370fe0 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Fri Aug 20 15:47:39 2021 -0700 mark and sweep indices for V3 segment format (#7301) The general idea to clean up indices for V3 consists of two steps: mark and sweep. Calling removeIndex() marks the indices to be removed; sweep happens in close() method of SingleFileIndexDirectory. The index creation handlers are supposed to call removeIndex() by comparing local indices and those set in table config. No failure handling during sweep. It's expected to be called during segment reloading, which actually has failure handling (in short, by creating a backup folder before doing reloading, and restore with the backup folder upon any failure). --- .../local/segment/store/FilePerIndexDirectory.java | 14 +- .../segment/local/segment/store/IndexEntry.java | 8 +- .../segment/local/segment/store/IndexKey.java | 14 +- .../segment/store/SegmentLocalFSDirectory.java | 5 - .../segment/store/SingleFileIndexDirectory.java | 147 ++++++++++++----- .../store/{IndexEntry.java => TextIndexUtils.java} | 30 ++-- .../segment/store/FilePerIndexDirectoryTest.java | 66 +++++++- .../segment/local/segment/store/IndexKeyTest.java | 43 +++++ .../store/SingleFileIndexDirectoryTest.java | 179 ++++++++++++++++++++- .../segment/spi/store/ColumnIndexDirectory.java | 6 - .../pinot/segment/spi/store/SegmentDirectory.java | 6 - 11 files changed, 429 insertions(+), 89 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java index 43d6fd2..35cdf3c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -95,18 +96,15 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { @Override public void removeIndex(String columnName, ColumnIndexType indexType) { - File indexFile = getFileFor(columnName, indexType); - if (indexFile.delete()) { - _indexBuffers.remove(new IndexKey(columnName, indexType)); + _indexBuffers.remove(new IndexKey(columnName, indexType)); + if (indexType == ColumnIndexType.TEXT_INDEX) { + TextIndexUtils.cleanupTextIndex(_segmentDirectory, columnName); + } else { + FileUtils.deleteQuietly(getFileFor(columnName, indexType)); } } @Override - public boolean isIndexRemovalSupported() { - return true; - } - - @Override public Set<String> getColumnsWithIndex(ColumnIndexType type) { Set<String> columns = new HashSet<>(); for (IndexKey indexKey : _indexBuffers.keySet()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java index 9a0de05..bf20cf9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory; class IndexEntry { private static final Logger LOGGER = LoggerFactory.getLogger(IndexEntry.class); - IndexKey _key; + final IndexKey _key; long _startOffset = -1; long _size = -1; PinotDataBuffer _buffer; @@ -36,6 +36,12 @@ class IndexEntry { _key = key; } + public IndexEntry(IndexKey key, long startOffset, long size) { + _key = key; + _startOffset = startOffset; + _size = size; + } + @Override public String toString() { return _key.toString() + " : [" + _startOffset + "," + (_startOffset + _size) + ")"; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java index ffd78b3..a81cb9d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java @@ -26,11 +26,11 @@ import org.slf4j.LoggerFactory; /** * Class representing index name and type */ -public class IndexKey { +public class IndexKey implements Comparable<IndexKey> { private static final Logger LOGGER = LoggerFactory.getLogger(IndexKey.class); - String _name; - ColumnIndexType _type; + final String _name; + final ColumnIndexType _type; /** * @param name column name @@ -69,4 +69,12 @@ public class IndexKey { public String toString() { return _name + "." + _type.getIndexName(); } + + @Override + public int compareTo(IndexKey o) { + if (_name.equals(o._name)) { + return _type.compareTo(o._type); + } + return _name.compareTo(o._name); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java index 347ad05..3729514 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java @@ -337,11 +337,6 @@ public class SegmentLocalFSDirectory extends SegmentDirectory { } @Override - public boolean isIndexRemovalSupported() { - return _columnIndexDirectory.isIndexRemovalSupported(); - } - - @Override public void removeIndex(String columnName, ColumnIndexType indexType) { _columnIndexDirectory.removeIndex(columnName, indexType); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java index aae4bc7..024c5d6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java @@ -18,14 +18,16 @@ */ package org.apache.pinot.segment.local.segment.store; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; -import java.io.FilenameFilter; import java.io.IOException; import java.io.PrintWriter; +import java.io.RandomAccessFile; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -36,6 +38,7 @@ import java.util.SortedMap; import java.util.TreeMap; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -84,6 +87,13 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { private final Map<IndexKey, IndexEntry> _columnEntries; private final List<PinotDataBuffer> _allocBuffers; + // For V3 segment format, the index cleanup consists of two steps: mark and sweep. + // The removeIndex() method marks an index to be removed; and the index info is + // deleted from _columnEntries so that it becomes unavailable from now on. Then, + // The cleanupRemovedIndices() method cleans up the marked indices from disk and + // re-arranges the content in index file to keep it compact. + private boolean _shouldCleanupRemovedIndices; + /** * @param segmentDirectory File pointing to segment directory * @param segmentMetadata segment metadata. Metadata must be fully initialized @@ -131,28 +141,12 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { @Override public boolean hasIndexFor(String column, ColumnIndexType type) { if (type == ColumnIndexType.TEXT_INDEX) { - return hasTextIndex(column); + return TextIndexUtils.hasTextIndex(_segmentDirectory, column); } IndexKey key = new IndexKey(column, type); return _columnEntries.containsKey(key); } - private boolean hasTextIndex(String column) { - String suffix = V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION; - File[] textIndexFiles = _segmentDirectory.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.equals(column + suffix); - } - }); - if (textIndexFiles.length > 0) { - Preconditions.checkState(textIndexFiles.length == 1, - "Illegal number of text index directories for columns " + column + " segment directory " + _segmentDirectory.getAbsolutePath()); - return true; - } - return false; - } - private PinotDataBuffer checkAndGetIndexBuffer(String column, ColumnIndexType type) { IndexKey key = new IndexKey(column, type); IndexEntry entry = _columnEntries.get(key); @@ -320,46 +314,65 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { throws IOException { File mapFile = new File(_segmentDirectory, V1Constants.INDEX_MAP_FILE_NAME); try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(mapFile, true)))) { - String startKey = getKey(entry._key._name, entry._key._type.getIndexName(), true); - - StringBuilder sb = new StringBuilder(); - sb.append(startKey).append(" = ").append(entry._startOffset); - writer.println(sb.toString()); - - String endKey = getKey(entry._key._name, entry._key._type.getIndexName(), false); - sb = new StringBuilder(); - sb.append(endKey).append(" = ").append(entry._size); - writer.println(sb.toString()); + persistIndexMap(entry, writer); } } - private String getKey(String column, String indexName, boolean isStartOffset) { - return column + MAP_KEY_SEPARATOR + indexName + MAP_KEY_SEPARATOR + (isStartOffset ? "startOffset" : "size"); - } - private String allocationContext(IndexKey key) { return this.getClass().getSimpleName() + key.toString(); } + /** + * This method sweeps the indices marked for removal. Exception is simply bubbled up w/o + * trying to recover disk states from failure. This method is expected to run during segment + * reloading, which has failure handling by creating a backup folder before doing reloading. + */ + private void cleanupRemovedIndices() + throws IOException { + File tmpIdxFile = new File(_segmentDirectory, V1Constants.INDEX_FILE_NAME + ".tmp"); + // Sort indices by column name and index type while copying, so that the + // new index_map file is easy to inspect for troubleshooting. + List<IndexEntry> retained = copyIndices(_indexFile, tmpIdxFile, new TreeMap<>(_columnEntries)); + + FileUtils.deleteQuietly(_indexFile); + Preconditions + .checkState(tmpIdxFile.renameTo(_indexFile), "Failed to rename temp index file: %s to original index file: %s", + tmpIdxFile, _indexFile); + + File mapFile = new File(_segmentDirectory, V1Constants.INDEX_MAP_FILE_NAME); + FileUtils.deleteQuietly(mapFile); + try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(mapFile)))) { + persistIndexMaps(retained, writer); + } + } + @Override public void close() throws IOException { for (PinotDataBuffer buf : _allocBuffers) { buf.close(); } + // Cleanup removed indices after closing and flushing buffers, so + // that potential index updates can be persisted across cleanups. + if (_shouldCleanupRemovedIndices) { + cleanupRemovedIndices(); + } _columnEntries.clear(); _allocBuffers.clear(); } @Override public void removeIndex(String columnName, ColumnIndexType indexType) { - throw new UnsupportedOperationException( - "Index removal is not supported for single file index format. Requested colum: " + columnName + " indexType: " + indexType); - } - - @Override - public boolean isIndexRemovalSupported() { - return false; + // Text index is kept in its own files, thus can be removed directly. + if (indexType == ColumnIndexType.TEXT_INDEX) { + TextIndexUtils.cleanupTextIndex(_segmentDirectory, columnName); + return; + } + // Only remember to cleanup indices upon close(), if any existing + // index gets marked for removal. + if (_columnEntries.remove(new IndexKey(columnName, indexType)) != null) { + _shouldCleanupRemovedIndices = true; + } } @Override @@ -377,4 +390,60 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { public String toString() { return _segmentDirectory.toString() + "/" + _indexFile.toString(); } + + /** + * Copy indices, as specified in the Map, from src file to dest file. The Map contains info + * about where to find the index data in the src file, like startOffsets and data sizes. The + * indices are packed together in the dest file, and their positions are returned. + * + * @param srcFile contains indices to copy to dest file, and it may contain other data to leave behind. + * @param destFile holds the indices copied from src file, and those indices appended one after another. + * @param indicesToCopy specifies where to find the indices in the src file, with offset and size info. + * @return the offsets and sizes for the indices in the dest file. + * @throws IOException from FileChannels upon failure to r/w the index files, and simply raised to caller. + */ + @VisibleForTesting + static List<IndexEntry> copyIndices(File srcFile, File destFile, Map<IndexKey, IndexEntry> indicesToCopy) + throws IOException { + // Copy index from original index file and append to temp file. + // Keep track of the index entry pointing to the temp index file. + List<IndexEntry> retained = new ArrayList<>(); + long nextOffset = 0; + // With FileChannel, we can seek to the data flexibly. + try (FileChannel srcCh = new RandomAccessFile(srcFile, "r").getChannel(); + FileChannel dstCh = new RandomAccessFile(destFile, "rw").getChannel()) { + for (IndexEntry index : indicesToCopy.values()) { + srcCh.transferTo(index._startOffset, index._size, dstCh); + retained.add(new IndexEntry(index._key, nextOffset, index._size)); + nextOffset += index._size; + } + } + return retained; + } + + private static String getKey(String column, String indexName, boolean isStartOffset) { + return column + MAP_KEY_SEPARATOR + indexName + MAP_KEY_SEPARATOR + (isStartOffset ? "startOffset" : "size"); + } + + @VisibleForTesting + static void persistIndexMaps(List<IndexEntry> entries, PrintWriter writer) { + for (IndexEntry entry : entries) { + persistIndexMap(entry, writer); + } + } + + private static void persistIndexMap(IndexEntry entry, PrintWriter writer) { + String colName = entry._key._name; + String idxType = entry._key._type.getIndexName(); + + String startKey = getKey(colName, idxType, true); + StringBuilder sb = new StringBuilder(); + sb.append(startKey).append(" = ").append(entry._startOffset); + writer.println(sb); + + String endKey = getKey(colName, idxType, false); + sb = new StringBuilder(); + sb.append(endKey).append(" = ").append(entry._size); + writer.println(sb); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java similarity index 53% copy from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java copy to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java index 9a0de05..969f414 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java @@ -18,26 +18,24 @@ */ package org.apache.pinot.segment.local.segment.store; -import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.File; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.spi.V1Constants; -/* package-private */ -class IndexEntry { - private static final Logger LOGGER = LoggerFactory.getLogger(IndexEntry.class); - - IndexKey _key; - long _startOffset = -1; - long _size = -1; - PinotDataBuffer _buffer; +class TextIndexUtils { + private TextIndexUtils() { + } - public IndexEntry(IndexKey key) { - _key = key; + static void cleanupTextIndex(File segDir, String column) { + // Remove the lucene index file and potentially the docId mapping file. + File idxFile = new File(segDir, column + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION); + FileUtils.deleteQuietly(idxFile); + File mapFile = new File(segDir, column + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION); + FileUtils.deleteQuietly(mapFile); } - @Override - public String toString() { - return _key.toString() + " : [" + _startOffset + "," + (_startOffset + _size) + ")"; + static boolean hasTextIndex(File segDir, String column) { + return new File(segDir, column + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists(); } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java index 909a235..0733449 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java @@ -22,8 +22,12 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -31,6 +35,7 @@ import org.apache.pinot.segment.spi.store.ColumnIndexDirectory; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.util.TestUtils; +import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -157,13 +162,72 @@ public class FilePerIndexDirectoryTest { fpi.newBuffer("col2", ColumnIndexType.DICTIONARY, 100); assertTrue(fpi.getFileFor("col1", ColumnIndexType.FORWARD_INDEX).exists()); assertTrue(fpi.getFileFor("col2", ColumnIndexType.DICTIONARY).exists()); - assertTrue(fpi.isIndexRemovalSupported()); fpi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX); assertFalse(fpi.getFileFor("col1", ColumnIndexType.FORWARD_INDEX).exists()); } } @Test + public void testRemoveTextIndices() + throws IOException { + try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true)) { + PinotDataBuffer buf = fpi.newBuffer("col1", ColumnIndexType.FORWARD_INDEX, 1024); + buf.putInt(0, 1); + + buf = fpi.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024); + buf.putChar(111, 'h'); + + fooCreator.add("{\"clean\":\"this\"}"); + fooCreator.seal(); + barCreator.add("{\"retain\":\"this\"}"); + barCreator.add("{\"keep\":\"this\"}"); + barCreator.add("{\"hold\":\"this\"}"); + barCreator.seal(); + } + + // Remove the Text index to trigger cleanup. + try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertTrue(fpi.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX)); + // Use TextIndex once to trigger the creation of mapping files. + LuceneTextIndexReader fooReader = new LuceneTextIndexReader("foo", TEMP_DIR, 1, new HashMap<>()); + fooReader.getDocIds("clean"); + LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", TEMP_DIR, 3, new HashMap<>()); + barReader.getDocIds("retain hold"); + + // Both files for TextIndex should be removed. + fpi.removeIndex("foo", ColumnIndexType.TEXT_INDEX); + assertFalse(new File(TEMP_DIR, "foo" + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists()); + assertFalse( + new File(TEMP_DIR, "foo" + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists()); + } + assertTrue(new File(TEMP_DIR, "bar" + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists()); + assertTrue(new File(TEMP_DIR, "bar" + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists()); + + // Read indices back and check the content. + try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertFalse(fpi.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX)); + + assertTrue(fpi.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX)); + PinotDataBuffer buf = fpi.getBuffer("col1", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.getInt(0), 1); + + assertTrue(fpi.hasIndexFor("col1", ColumnIndexType.DICTIONARY)); + buf = fpi.getBuffer("col1", ColumnIndexType.DICTIONARY); + assertEquals(buf.getChar(111), 'h'); + + assertTrue(fpi.hasIndexFor("bar", ColumnIndexType.TEXT_INDEX)); + + // Check if the text index still work. + LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", TEMP_DIR, 3, new HashMap<>()); + MutableRoaringBitmap ids = barReader.getDocIds("retain hold"); + assertTrue(ids.contains(0)); + assertTrue(ids.contains(2)); + } + } + + @Test public void testGetColumnIndices() throws IOException { try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java new file mode 100644 index 0000000..cf76411 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.store; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.pinot.segment.spi.store.ColumnIndexType; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class IndexKeyTest { + @Test + public void testCompareTo() { + List<IndexKey> iks = Arrays + .asList(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX), new IndexKey("bar", ColumnIndexType.BLOOM_FILTER), + new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), new IndexKey("bar", ColumnIndexType.DICTIONARY), + new IndexKey("baz", ColumnIndexType.JSON_INDEX), new IndexKey("baz", ColumnIndexType.FST_INDEX)); + Collections.sort(iks); + assertEquals(iks, Arrays + .asList(new IndexKey("bar", ColumnIndexType.DICTIONARY), new IndexKey("bar", ColumnIndexType.BLOOM_FILTER), + new IndexKey("baz", ColumnIndexType.FST_INDEX), new IndexKey("baz", ColumnIndexType.JSON_INDEX), + new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), new IndexKey("foo", ColumnIndexType.INVERTED_INDEX))); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java index 81422d9..e31a3dd 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java @@ -18,13 +18,25 @@ */ package org.apache.pinot.segment.local.segment.store; +import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.creator.SegmentVersion; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -33,12 +45,14 @@ import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.util.TestUtils; import org.mockito.Mockito; +import org.roaringbitmap.buffer.MutableRoaringBitmap; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class SingleFileIndexDirectoryTest { @@ -155,17 +169,168 @@ public class SingleFileIndexDirectoryTest { } } - @Test(expectedExceptions = UnsupportedOperationException.class) + @Test public void testRemoveIndex() throws IOException, ConfigurationException { try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024); - assertFalse(sfd.isIndexRemovalSupported()); sfd.removeIndex("col1", ColumnIndexType.DICTIONARY); + assertFalse(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY)); + } + } + + @Test + public void testCleanupRemovedIndices() + throws IOException, ConfigurationException { + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + PinotDataBuffer buf = sfd.newBuffer("col1", ColumnIndexType.FORWARD_INDEX, 1024); + buf.putInt(0, 1); // from begin position. + + buf = sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024); + buf.putChar(111, 'h'); + + buf = sfd.newBuffer("col2", ColumnIndexType.FORWARD_INDEX, 1024); + buf.putChar(222, 'w'); + + buf = sfd.newBuffer("col1", ColumnIndexType.JSON_INDEX, 1024); + buf.putLong(333, 111111L); + + buf = sfd.newBuffer("col2", ColumnIndexType.H3_INDEX, 1024); + buf.putDouble(1016, 222.222); // touch end position. + } + + // Remove the JSON index to trigger cleanup, but keep H3 index. + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.JSON_INDEX)); + sfd.removeIndex("col1", ColumnIndexType.JSON_INDEX); + } + + // Read indices back and check the content. + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertFalse(sfd.hasIndexFor("col1", ColumnIndexType.JSON_INDEX)); + + assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX)); + PinotDataBuffer buf = sfd.getBuffer("col1", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.getInt(0), 1); + + assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY)); + buf = sfd.getBuffer("col1", ColumnIndexType.DICTIONARY); + assertEquals(buf.getChar(111), 'h'); + + assertTrue(sfd.hasIndexFor("col2", ColumnIndexType.FORWARD_INDEX)); + buf = sfd.getBuffer("col2", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.getChar(222), 'w'); + + assertTrue(sfd.hasIndexFor("col2", ColumnIndexType.H3_INDEX)); + buf = sfd.getBuffer("col2", ColumnIndexType.H3_INDEX); + assertEquals(buf.getDouble(1016), 222.222); + } + } + + @Test + public void testRemoveTextIndices() + throws IOException, ConfigurationException { + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap); + LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", TEMP_DIR, true); + LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", TEMP_DIR, true)) { + PinotDataBuffer buf = sfd.newBuffer("col1", ColumnIndexType.FORWARD_INDEX, 1024); + buf.putInt(0, 1); + + buf = sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024); + buf.putChar(111, 'h'); + + fooCreator.add("{\"clean\":\"this\"}"); + fooCreator.seal(); + barCreator.add("{\"retain\":\"this\"}"); + barCreator.add("{\"keep\":\"this\"}"); + barCreator.add("{\"hold\":\"this\"}"); + barCreator.seal(); + } + + // Remove the Text index to trigger cleanup. + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertTrue(sfd.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX)); + // Use TextIndex once to trigger the creation of mapping files. + LuceneTextIndexReader fooReader = new LuceneTextIndexReader("foo", TEMP_DIR, 1, new HashMap<>()); + fooReader.getDocIds("clean"); + LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", TEMP_DIR, 3, new HashMap<>()); + barReader.getDocIds("retain hold"); + + // Both files for TextIndex should be removed. + sfd.removeIndex("foo", ColumnIndexType.TEXT_INDEX); + assertFalse(new File(TEMP_DIR, "foo" + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists()); + assertFalse( + new File(TEMP_DIR, "foo" + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists()); + } + assertTrue(new File(TEMP_DIR, "bar" + V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists()); + assertTrue(new File(TEMP_DIR, "bar" + V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists()); + + // Read indices back and check the content. + try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + assertFalse(sfd.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX)); + + assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX)); + PinotDataBuffer buf = sfd.getBuffer("col1", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.getInt(0), 1); + + assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY)); + buf = sfd.getBuffer("col1", ColumnIndexType.DICTIONARY); + assertEquals(buf.getChar(111), 'h'); + + assertTrue(sfd.hasIndexFor("bar", ColumnIndexType.TEXT_INDEX)); + + // Check if the text index still work. + LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", TEMP_DIR, 3, new HashMap<>()); + MutableRoaringBitmap ids = barReader.getDocIds("retain hold"); + assertTrue(ids.contains(0)); + assertTrue(ids.contains(2)); } } @Test + public void testCopyIndicesTo() + throws IOException { + File srcTmp = new File(TEMP_DIR, UUID.randomUUID().toString()); + if (!srcTmp.exists()) { + FileUtils.touch(srcTmp); + } + File dstTmp = new File(TEMP_DIR, UUID.randomUUID().toString()); + Map<IndexKey, IndexEntry> indicesToCopy = new TreeMap<>(ImmutableMap + .of(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX), + new IndexEntry(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX), 0, 0), + new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), + new IndexEntry(new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), 0, 0), + new IndexKey("bar", ColumnIndexType.FORWARD_INDEX), + new IndexEntry(new IndexKey("bar", ColumnIndexType.FORWARD_INDEX), 0, 0), + new IndexKey("bar", ColumnIndexType.DICTIONARY), + new IndexEntry(new IndexKey("bar", ColumnIndexType.DICTIONARY), 0, 0), + new IndexKey("bar", ColumnIndexType.JSON_INDEX), + new IndexEntry(new IndexKey("bar", ColumnIndexType.JSON_INDEX), 0, 0))); + List<IndexEntry> retained = SingleFileIndexDirectory.copyIndices(srcTmp, dstTmp, indicesToCopy); + List<IndexKey> retainedKeys = retained.stream().map(e -> e._key).collect(Collectors.toList()); + // The returned entries are sorted. + assertEquals(retainedKeys, Arrays + .asList(new IndexKey("bar", ColumnIndexType.DICTIONARY), new IndexKey("bar", ColumnIndexType.FORWARD_INDEX), + new IndexKey("bar", ColumnIndexType.JSON_INDEX), new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), + new IndexKey("foo", ColumnIndexType.INVERTED_INDEX))); + } + + @Test + public void testPersistIndexMaps() { + ByteArrayOutputStream output = new ByteArrayOutputStream(1024 * 1024); + try (PrintWriter pw = new PrintWriter(output)) { + List<IndexEntry> entries = Arrays + .asList(new IndexEntry(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX), 0, 1024), + new IndexEntry(new IndexKey("bar", ColumnIndexType.INVERTED_INDEX), 1024, 100), + new IndexEntry(new IndexKey("baz", ColumnIndexType.INVERTED_INDEX), 1124, 200)); + SingleFileIndexDirectory.persistIndexMaps(entries, pw); + } + assertEquals(output.toString(), "foo.inverted_index.startOffset = 0\nfoo.inverted_index.size = 1024\n" + + "bar.inverted_index.startOffset = 1024\nbar.inverted_index.size = 100\n" + + "baz.inverted_index.startOffset = 1124\nbaz.inverted_index.size = 200\n"); + } + + @Test public void testGetColumnIndices() throws Exception { try (SingleFileIndexDirectory spi = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { @@ -180,8 +345,14 @@ public class SingleFileIndexDirectoryTest { new HashSet<>(Collections.singletonList("col2"))); assertEquals(spi.getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX), new HashSet<>(Collections.singletonList("col4"))); - // TODO: implement removeIndex and test it in next RP - // spi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX); + + spi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX); + spi.removeIndex("col2", ColumnIndexType.DICTIONARY); + spi.removeIndex("col111", ColumnIndexType.DICTIONARY); + assertEquals(spi.getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX), + new HashSet<>(Collections.singletonList("col3"))); + assertEquals(spi.getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX), + new HashSet<>(Collections.singletonList("col4"))); } } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java index ad5a760..ae6c127 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java @@ -81,12 +81,6 @@ public abstract class ColumnIndexDirectory implements Closeable { public abstract void removeIndex(String columnName, ColumnIndexType indexType); /** - * Check if the implementation supports removing existing index - * @return true if the index removal is supported - */ - public abstract boolean isIndexRemovalSupported(); - - /** * Get the columns with specific index type, loaded by column index directory. * @return a set of columns with such index type. */ diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java index b8fcc92..e10d864 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java @@ -181,12 +181,6 @@ public abstract class SegmentDirectory implements Closeable { throws IOException; /** - * Check if the removal of index is a supported operation - * @return true if the index removal is supported - */ - public abstract boolean isIndexRemovalSupported(); - - /** * Removes an existing column index from directory * @param columnName column name * @param indexType column index type --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org