This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a074cde  mark and sweep indices for V3 segment format (#7301)
a074cde is described below

commit a074cde132d69e0ad03d64f6c07b74f822370fe0
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Fri Aug 20 15:47:39 2021 -0700

    mark and sweep indices for V3 segment format (#7301)
    
    The general idea to clean up indices for V3 consists of two steps: mark and 
sweep. Calling removeIndex() marks the indices to be removed; sweep happens in 
close() method of SingleFileIndexDirectory. The index creation handlers are 
supposed to call removeIndex() by comparing local indices and those set in 
table config.
    
    No failure handling during sweep. It's expected to be called during segment 
reloading, which actually has failure handling (in short, by creating a backup 
folder before doing reloading, and restore with the backup folder upon any 
failure).
---
 .../local/segment/store/FilePerIndexDirectory.java |  14 +-
 .../segment/local/segment/store/IndexEntry.java    |   8 +-
 .../segment/local/segment/store/IndexKey.java      |  14 +-
 .../segment/store/SegmentLocalFSDirectory.java     |   5 -
 .../segment/store/SingleFileIndexDirectory.java    | 147 ++++++++++++-----
 .../store/{IndexEntry.java => TextIndexUtils.java} |  30 ++--
 .../segment/store/FilePerIndexDirectoryTest.java   |  66 +++++++-
 .../segment/local/segment/store/IndexKeyTest.java  |  43 +++++
 .../store/SingleFileIndexDirectoryTest.java        | 179 ++++++++++++++++++++-
 .../segment/spi/store/ColumnIndexDirectory.java    |   6 -
 .../pinot/segment/spi/store/SegmentDirectory.java  |   6 -
 11 files changed, 429 insertions(+), 89 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index 43d6fd2..35cdf3c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -95,18 +96,15 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
 
   @Override
   public void removeIndex(String columnName, ColumnIndexType indexType) {
-    File indexFile = getFileFor(columnName, indexType);
-    if (indexFile.delete()) {
-      _indexBuffers.remove(new IndexKey(columnName, indexType));
+    _indexBuffers.remove(new IndexKey(columnName, indexType));
+    if (indexType == ColumnIndexType.TEXT_INDEX) {
+      TextIndexUtils.cleanupTextIndex(_segmentDirectory, columnName);
+    } else {
+      FileUtils.deleteQuietly(getFileFor(columnName, indexType));
     }
   }
 
   @Override
-  public boolean isIndexRemovalSupported() {
-    return true;
-  }
-
-  @Override
   public Set<String> getColumnsWithIndex(ColumnIndexType type) {
     Set<String> columns = new HashSet<>();
     for (IndexKey indexKey : _indexBuffers.keySet()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
index 9a0de05..bf20cf9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 class IndexEntry {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexEntry.class);
 
-  IndexKey _key;
+  final IndexKey _key;
   long _startOffset = -1;
   long _size = -1;
   PinotDataBuffer _buffer;
@@ -36,6 +36,12 @@ class IndexEntry {
     _key = key;
   }
 
+  public IndexEntry(IndexKey key, long startOffset, long size) {
+    _key = key;
+    _startOffset = startOffset;
+    _size = size;
+  }
+
   @Override
   public String toString() {
     return _key.toString() + " : [" + _startOffset + "," + (_startOffset + 
_size) + ")";
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java
index ffd78b3..a81cb9d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java
@@ -26,11 +26,11 @@ import org.slf4j.LoggerFactory;
 /**
  * Class representing index name and type
  */
-public class IndexKey {
+public class IndexKey implements Comparable<IndexKey> {
   private static final Logger LOGGER = LoggerFactory.getLogger(IndexKey.class);
 
-  String _name;
-  ColumnIndexType _type;
+  final String _name;
+  final ColumnIndexType _type;
 
   /**
    * @param name column name
@@ -69,4 +69,12 @@ public class IndexKey {
   public String toString() {
     return _name + "." + _type.getIndexName();
   }
+
+  @Override
+  public int compareTo(IndexKey o) {
+    if (_name.equals(o._name)) {
+      return _type.compareTo(o._type);
+    }
+    return _name.compareTo(o._name);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index 347ad05..3729514 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -337,11 +337,6 @@ public class SegmentLocalFSDirectory extends 
SegmentDirectory {
     }
 
     @Override
-    public boolean isIndexRemovalSupported() {
-      return _columnIndexDirectory.isIndexRemovalSupported();
-    }
-
-    @Override
     public void removeIndex(String columnName, ColumnIndexType indexType) {
       _columnIndexDirectory.removeIndex(columnName, indexType);
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index aae4bc7..024c5d6 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -18,14 +18,16 @@
  */
 package org.apache.pinot.segment.local.segment.store;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.RandomAccessFile;
 import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,6 +38,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -84,6 +87,13 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
   private final Map<IndexKey, IndexEntry> _columnEntries;
   private final List<PinotDataBuffer> _allocBuffers;
 
+  // For V3 segment format, the index cleanup consists of two steps: mark and 
sweep.
+  // The removeIndex() method marks an index to be removed; and the index info 
is
+  // deleted from _columnEntries so that it becomes unavailable from now on. 
Then,
+  // The cleanupRemovedIndices() method cleans up the marked indices from disk 
and
+  // re-arranges the content in index file to keep it compact.
+  private boolean _shouldCleanupRemovedIndices;
+
   /**
    * @param segmentDirectory File pointing to segment directory
    * @param segmentMetadata segment metadata. Metadata must be fully 
initialized
@@ -131,28 +141,12 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
   @Override
   public boolean hasIndexFor(String column, ColumnIndexType type) {
     if (type == ColumnIndexType.TEXT_INDEX) {
-      return hasTextIndex(column);
+      return TextIndexUtils.hasTextIndex(_segmentDirectory, column);
     }
     IndexKey key = new IndexKey(column, type);
     return _columnEntries.containsKey(key);
   }
 
-  private boolean hasTextIndex(String column) {
-    String suffix = V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION;
-    File[] textIndexFiles = _segmentDirectory.listFiles(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return name.equals(column + suffix);
-      }
-    });
-    if (textIndexFiles.length > 0) {
-      Preconditions.checkState(textIndexFiles.length == 1,
-          "Illegal number of text index directories for columns " + column + " 
segment directory " + _segmentDirectory.getAbsolutePath());
-      return true;
-    }
-    return false;
-  }
-
   private PinotDataBuffer checkAndGetIndexBuffer(String column, 
ColumnIndexType type) {
     IndexKey key = new IndexKey(column, type);
     IndexEntry entry = _columnEntries.get(key);
@@ -320,46 +314,65 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
       throws IOException {
     File mapFile = new File(_segmentDirectory, 
V1Constants.INDEX_MAP_FILE_NAME);
     try (PrintWriter writer = new PrintWriter(new BufferedWriter(new 
FileWriter(mapFile, true)))) {
-      String startKey = getKey(entry._key._name, 
entry._key._type.getIndexName(), true);
-
-      StringBuilder sb = new StringBuilder();
-      sb.append(startKey).append(" = ").append(entry._startOffset);
-      writer.println(sb.toString());
-
-      String endKey = getKey(entry._key._name, 
entry._key._type.getIndexName(), false);
-      sb = new StringBuilder();
-      sb.append(endKey).append(" = ").append(entry._size);
-      writer.println(sb.toString());
+      persistIndexMap(entry, writer);
     }
   }
 
-  private String getKey(String column, String indexName, boolean 
isStartOffset) {
-    return column + MAP_KEY_SEPARATOR + indexName + MAP_KEY_SEPARATOR + 
(isStartOffset ? "startOffset" : "size");
-  }
-
   private String allocationContext(IndexKey key) {
     return this.getClass().getSimpleName() + key.toString();
   }
 
+  /**
+   * This method sweeps the indices marked for removal. Exception is simply 
bubbled up w/o
+   * trying to recover disk states from failure. This method is expected to 
run during segment
+   * reloading, which has failure handling by creating a backup folder before 
doing reloading.
+   */
+  private void cleanupRemovedIndices()
+      throws IOException {
+    File tmpIdxFile = new File(_segmentDirectory, V1Constants.INDEX_FILE_NAME 
+ ".tmp");
+    // Sort indices by column name and index type while copying, so that the
+    // new index_map file is easy to inspect for troubleshooting.
+    List<IndexEntry> retained = copyIndices(_indexFile, tmpIdxFile, new 
TreeMap<>(_columnEntries));
+
+    FileUtils.deleteQuietly(_indexFile);
+    Preconditions
+        .checkState(tmpIdxFile.renameTo(_indexFile), "Failed to rename temp 
index file: %s to original index file: %s",
+            tmpIdxFile, _indexFile);
+
+    File mapFile = new File(_segmentDirectory, 
V1Constants.INDEX_MAP_FILE_NAME);
+    FileUtils.deleteQuietly(mapFile);
+    try (PrintWriter writer = new PrintWriter(new BufferedWriter(new 
FileWriter(mapFile)))) {
+      persistIndexMaps(retained, writer);
+    }
+  }
+
   @Override
   public void close()
       throws IOException {
     for (PinotDataBuffer buf : _allocBuffers) {
       buf.close();
     }
+    // Cleanup removed indices after closing and flushing buffers, so
+    // that potential index updates can be persisted across cleanups.
+    if (_shouldCleanupRemovedIndices) {
+      cleanupRemovedIndices();
+    }
     _columnEntries.clear();
     _allocBuffers.clear();
   }
 
   @Override
   public void removeIndex(String columnName, ColumnIndexType indexType) {
-    throw new UnsupportedOperationException(
-        "Index removal is not supported for single file index format. 
Requested colum: " + columnName + " indexType: " + indexType);
-  }
-
-  @Override
-  public boolean isIndexRemovalSupported() {
-    return false;
+    // Text index is kept in its own files, thus can be removed directly.
+    if (indexType == ColumnIndexType.TEXT_INDEX) {
+      TextIndexUtils.cleanupTextIndex(_segmentDirectory, columnName);
+      return;
+    }
+    // Only remember to cleanup indices upon close(), if any existing
+    // index gets marked for removal.
+    if (_columnEntries.remove(new IndexKey(columnName, indexType)) != null) {
+      _shouldCleanupRemovedIndices = true;
+    }
   }
 
   @Override
@@ -377,4 +390,60 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
   public String toString() {
     return _segmentDirectory.toString() + "/" + _indexFile.toString();
   }
+
+  /**
+   * Copy indices, as specified in the Map, from src file to dest file. The 
Map contains info
+   * about where to find the index data in the src file, like startOffsets and 
data sizes. The
+   * indices are packed together in the dest file, and their positions are 
returned.
+   *
+   * @param srcFile contains indices to copy to dest file, and it may contain 
other data to leave behind.
+   * @param destFile holds the indices copied from src file, and those indices 
appended one after another.
+   * @param indicesToCopy specifies where to find the indices in the src file, 
with offset and size info.
+   * @return the offsets and sizes for the indices in the dest file.
+   * @throws IOException from FileChannels upon failure to r/w the index 
files, and simply raised to caller.
+   */
+  @VisibleForTesting
+  static List<IndexEntry> copyIndices(File srcFile, File destFile, 
Map<IndexKey, IndexEntry> indicesToCopy)
+      throws IOException {
+    // Copy index from original index file and append to temp file.
+    // Keep track of the index entry pointing to the temp index file.
+    List<IndexEntry> retained = new ArrayList<>();
+    long nextOffset = 0;
+    // With FileChannel, we can seek to the data flexibly.
+    try (FileChannel srcCh = new RandomAccessFile(srcFile, "r").getChannel();
+        FileChannel dstCh = new RandomAccessFile(destFile, "rw").getChannel()) 
{
+      for (IndexEntry index : indicesToCopy.values()) {
+        srcCh.transferTo(index._startOffset, index._size, dstCh);
+        retained.add(new IndexEntry(index._key, nextOffset, index._size));
+        nextOffset += index._size;
+      }
+    }
+    return retained;
+  }
+
+  private static String getKey(String column, String indexName, boolean 
isStartOffset) {
+    return column + MAP_KEY_SEPARATOR + indexName + MAP_KEY_SEPARATOR + 
(isStartOffset ? "startOffset" : "size");
+  }
+
+  @VisibleForTesting
+  static void persistIndexMaps(List<IndexEntry> entries, PrintWriter writer) {
+    for (IndexEntry entry : entries) {
+      persistIndexMap(entry, writer);
+    }
+  }
+
+  private static void persistIndexMap(IndexEntry entry, PrintWriter writer) {
+    String colName = entry._key._name;
+    String idxType = entry._key._type.getIndexName();
+
+    String startKey = getKey(colName, idxType, true);
+    StringBuilder sb = new StringBuilder();
+    sb.append(startKey).append(" = ").append(entry._startOffset);
+    writer.println(sb);
+
+    String endKey = getKey(colName, idxType, false);
+    sb = new StringBuilder();
+    sb.append(endKey).append(" = ").append(entry._size);
+    writer.println(sb);
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java
similarity index 53%
copy from 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
copy to 
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java
index 9a0de05..969f414 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/TextIndexUtils.java
@@ -18,26 +18,24 @@
  */
 package org.apache.pinot.segment.local.segment.store;
 
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.spi.V1Constants;
 
 
-/* package-private */
-class IndexEntry {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexEntry.class);
-
-  IndexKey _key;
-  long _startOffset = -1;
-  long _size = -1;
-  PinotDataBuffer _buffer;
+class TextIndexUtils {
+  private TextIndexUtils() {
+  }
 
-  public IndexEntry(IndexKey key) {
-    _key = key;
+  static void cleanupTextIndex(File segDir, String column) {
+    // Remove the lucene index file and potentially the docId mapping file.
+    File idxFile = new File(segDir, column + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION);
+    FileUtils.deleteQuietly(idxFile);
+    File mapFile = new File(segDir, column + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION);
+    FileUtils.deleteQuietly(mapFile);
   }
 
-  @Override
-  public String toString() {
-    return _key.toString() + " : [" + _startOffset + "," + (_startOffset + 
_size) + ")";
+  static boolean hasTextIndex(File segDir, String column) {
+    return new File(segDir, column + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists();
   }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
index 909a235..0733449 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectoryTest.java
@@ -22,8 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -31,6 +35,7 @@ import 
org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.util.TestUtils;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -157,13 +162,72 @@ public class FilePerIndexDirectoryTest {
       fpi.newBuffer("col2", ColumnIndexType.DICTIONARY, 100);
       assertTrue(fpi.getFileFor("col1", 
ColumnIndexType.FORWARD_INDEX).exists());
       assertTrue(fpi.getFileFor("col2", ColumnIndexType.DICTIONARY).exists());
-      assertTrue(fpi.isIndexRemovalSupported());
       fpi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX);
       assertFalse(fpi.getFileFor("col1", 
ColumnIndexType.FORWARD_INDEX).exists());
     }
   }
 
   @Test
+  public void testRemoveTextIndices()
+      throws IOException {
+    try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true)) {
+      PinotDataBuffer buf = fpi.newBuffer("col1", 
ColumnIndexType.FORWARD_INDEX, 1024);
+      buf.putInt(0, 1);
+
+      buf = fpi.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024);
+      buf.putChar(111, 'h');
+
+      fooCreator.add("{\"clean\":\"this\"}");
+      fooCreator.seal();
+      barCreator.add("{\"retain\":\"this\"}");
+      barCreator.add("{\"keep\":\"this\"}");
+      barCreator.add("{\"hold\":\"this\"}");
+      barCreator.seal();
+    }
+
+    // Remove the Text index to trigger cleanup.
+    try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertTrue(fpi.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX));
+      // Use TextIndex once to trigger the creation of mapping files.
+      LuceneTextIndexReader fooReader = new LuceneTextIndexReader("foo", 
TEMP_DIR, 1, new HashMap<>());
+      fooReader.getDocIds("clean");
+      LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", 
TEMP_DIR, 3, new HashMap<>());
+      barReader.getDocIds("retain hold");
+
+      // Both files for TextIndex should be removed.
+      fpi.removeIndex("foo", ColumnIndexType.TEXT_INDEX);
+      assertFalse(new File(TEMP_DIR, "foo" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists());
+      assertFalse(
+          new File(TEMP_DIR, "foo" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+    }
+    assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists());
+    assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+
+    // Read indices back and check the content.
+    try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertFalse(fpi.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX));
+
+      assertTrue(fpi.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX));
+      PinotDataBuffer buf = fpi.getBuffer("col1", 
ColumnIndexType.FORWARD_INDEX);
+      assertEquals(buf.getInt(0), 1);
+
+      assertTrue(fpi.hasIndexFor("col1", ColumnIndexType.DICTIONARY));
+      buf = fpi.getBuffer("col1", ColumnIndexType.DICTIONARY);
+      assertEquals(buf.getChar(111), 'h');
+
+      assertTrue(fpi.hasIndexFor("bar", ColumnIndexType.TEXT_INDEX));
+
+      // Check if the text index still work.
+      LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", 
TEMP_DIR, 3, new HashMap<>());
+      MutableRoaringBitmap ids = barReader.getDocIds("retain hold");
+      assertTrue(ids.contains(0));
+      assertTrue(ids.contains(2));
+    }
+  }
+
+  @Test
   public void testGetColumnIndices()
       throws IOException {
     try (FilePerIndexDirectory fpi = new FilePerIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java
new file mode 100644
index 0000000..cf76411
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/IndexKeyTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.store;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class IndexKeyTest {
+  @Test
+  public void testCompareTo() {
+    List<IndexKey> iks = Arrays
+        .asList(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX), new 
IndexKey("bar", ColumnIndexType.BLOOM_FILTER),
+            new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), new 
IndexKey("bar", ColumnIndexType.DICTIONARY),
+            new IndexKey("baz", ColumnIndexType.JSON_INDEX), new 
IndexKey("baz", ColumnIndexType.FST_INDEX));
+    Collections.sort(iks);
+    assertEquals(iks, Arrays
+        .asList(new IndexKey("bar", ColumnIndexType.DICTIONARY), new 
IndexKey("bar", ColumnIndexType.BLOOM_FILTER),
+            new IndexKey("baz", ColumnIndexType.FST_INDEX), new 
IndexKey("baz", ColumnIndexType.JSON_INDEX),
+            new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), new 
IndexKey("foo", ColumnIndexType.INVERTED_INDEX)));
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index 81422d9..e31a3dd 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -18,13 +18,25 @@
  */
 package org.apache.pinot.segment.local.segment.store;
 
+import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.io.FileUtils;
+import 
org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import 
org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -33,12 +45,14 @@ import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.util.TestUtils;
 import org.mockito.Mockito;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 
 public class SingleFileIndexDirectoryTest {
@@ -155,17 +169,168 @@ public class SingleFileIndexDirectoryTest {
     }
   }
 
-  @Test(expectedExceptions = UnsupportedOperationException.class)
+  @Test
   public void testRemoveIndex()
       throws IOException, ConfigurationException {
     try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
       sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024);
-      assertFalse(sfd.isIndexRemovalSupported());
       sfd.removeIndex("col1", ColumnIndexType.DICTIONARY);
+      assertFalse(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY));
+    }
+  }
+
+  @Test
+  public void testCleanupRemovedIndices()
+      throws IOException, ConfigurationException {
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      PinotDataBuffer buf = sfd.newBuffer("col1", 
ColumnIndexType.FORWARD_INDEX, 1024);
+      buf.putInt(0, 1); // from begin position.
+
+      buf = sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024);
+      buf.putChar(111, 'h');
+
+      buf = sfd.newBuffer("col2", ColumnIndexType.FORWARD_INDEX, 1024);
+      buf.putChar(222, 'w');
+
+      buf = sfd.newBuffer("col1", ColumnIndexType.JSON_INDEX, 1024);
+      buf.putLong(333, 111111L);
+
+      buf = sfd.newBuffer("col2", ColumnIndexType.H3_INDEX, 1024);
+      buf.putDouble(1016, 222.222); // touch end position.
+    }
+
+    // Remove the JSON index to trigger cleanup, but keep H3 index.
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.JSON_INDEX));
+      sfd.removeIndex("col1", ColumnIndexType.JSON_INDEX);
+    }
+
+    // Read indices back and check the content.
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertFalse(sfd.hasIndexFor("col1", ColumnIndexType.JSON_INDEX));
+
+      assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX));
+      PinotDataBuffer buf = sfd.getBuffer("col1", 
ColumnIndexType.FORWARD_INDEX);
+      assertEquals(buf.getInt(0), 1);
+
+      assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY));
+      buf = sfd.getBuffer("col1", ColumnIndexType.DICTIONARY);
+      assertEquals(buf.getChar(111), 'h');
+
+      assertTrue(sfd.hasIndexFor("col2", ColumnIndexType.FORWARD_INDEX));
+      buf = sfd.getBuffer("col2", ColumnIndexType.FORWARD_INDEX);
+      assertEquals(buf.getChar(222), 'w');
+
+      assertTrue(sfd.hasIndexFor("col2", ColumnIndexType.H3_INDEX));
+      buf = sfd.getBuffer("col2", ColumnIndexType.H3_INDEX);
+      assertEquals(buf.getDouble(1016), 222.222);
+    }
+  }
+
+  @Test
+  public void testRemoveTextIndices()
+      throws IOException, ConfigurationException {
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap);
+        LuceneTextIndexCreator fooCreator = new LuceneTextIndexCreator("foo", 
TEMP_DIR, true);
+        LuceneTextIndexCreator barCreator = new LuceneTextIndexCreator("bar", 
TEMP_DIR, true)) {
+      PinotDataBuffer buf = sfd.newBuffer("col1", 
ColumnIndexType.FORWARD_INDEX, 1024);
+      buf.putInt(0, 1);
+
+      buf = sfd.newBuffer("col1", ColumnIndexType.DICTIONARY, 1024);
+      buf.putChar(111, 'h');
+
+      fooCreator.add("{\"clean\":\"this\"}");
+      fooCreator.seal();
+      barCreator.add("{\"retain\":\"this\"}");
+      barCreator.add("{\"keep\":\"this\"}");
+      barCreator.add("{\"hold\":\"this\"}");
+      barCreator.seal();
+    }
+
+    // Remove the Text index to trigger cleanup.
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertTrue(sfd.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX));
+      // Use TextIndex once to trigger the creation of mapping files.
+      LuceneTextIndexReader fooReader = new LuceneTextIndexReader("foo", 
TEMP_DIR, 1, new HashMap<>());
+      fooReader.getDocIds("clean");
+      LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", 
TEMP_DIR, 3, new HashMap<>());
+      barReader.getDocIds("retain hold");
+
+      // Both files for TextIndex should be removed.
+      sfd.removeIndex("foo", ColumnIndexType.TEXT_INDEX);
+      assertFalse(new File(TEMP_DIR, "foo" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists());
+      assertFalse(
+          new File(TEMP_DIR, "foo" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+    }
+    assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_FILE_EXTENSION).exists());
+    assertTrue(new File(TEMP_DIR, "bar" + 
V1Constants.Indexes.LUCENE_TEXT_INDEX_DOCID_MAPPING_FILE_EXTENSION).exists());
+
+    // Read indices back and check the content.
+    try (SingleFileIndexDirectory sfd = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
+      assertFalse(sfd.hasIndexFor("foo", ColumnIndexType.TEXT_INDEX));
+
+      assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.FORWARD_INDEX));
+      PinotDataBuffer buf = sfd.getBuffer("col1", 
ColumnIndexType.FORWARD_INDEX);
+      assertEquals(buf.getInt(0), 1);
+
+      assertTrue(sfd.hasIndexFor("col1", ColumnIndexType.DICTIONARY));
+      buf = sfd.getBuffer("col1", ColumnIndexType.DICTIONARY);
+      assertEquals(buf.getChar(111), 'h');
+
+      assertTrue(sfd.hasIndexFor("bar", ColumnIndexType.TEXT_INDEX));
+
+      // Check if the text index still work.
+      LuceneTextIndexReader barReader = new LuceneTextIndexReader("bar", 
TEMP_DIR, 3, new HashMap<>());
+      MutableRoaringBitmap ids = barReader.getDocIds("retain hold");
+      assertTrue(ids.contains(0));
+      assertTrue(ids.contains(2));
     }
   }
 
   @Test
+  public void testCopyIndicesTo()
+      throws IOException {
+    File srcTmp = new File(TEMP_DIR, UUID.randomUUID().toString());
+    if (!srcTmp.exists()) {
+      FileUtils.touch(srcTmp);
+    }
+    File dstTmp = new File(TEMP_DIR, UUID.randomUUID().toString());
+    Map<IndexKey, IndexEntry> indicesToCopy = new TreeMap<>(ImmutableMap
+        .of(new IndexKey("foo", ColumnIndexType.INVERTED_INDEX),
+            new IndexEntry(new IndexKey("foo", 
ColumnIndexType.INVERTED_INDEX), 0, 0),
+            new IndexKey("foo", ColumnIndexType.FORWARD_INDEX),
+            new IndexEntry(new IndexKey("foo", ColumnIndexType.FORWARD_INDEX), 
0, 0),
+            new IndexKey("bar", ColumnIndexType.FORWARD_INDEX),
+            new IndexEntry(new IndexKey("bar", ColumnIndexType.FORWARD_INDEX), 
0, 0),
+            new IndexKey("bar", ColumnIndexType.DICTIONARY),
+            new IndexEntry(new IndexKey("bar", ColumnIndexType.DICTIONARY), 0, 
0),
+            new IndexKey("bar", ColumnIndexType.JSON_INDEX),
+            new IndexEntry(new IndexKey("bar", ColumnIndexType.JSON_INDEX), 0, 
0)));
+    List<IndexEntry> retained = SingleFileIndexDirectory.copyIndices(srcTmp, 
dstTmp, indicesToCopy);
+    List<IndexKey> retainedKeys = retained.stream().map(e -> 
e._key).collect(Collectors.toList());
+    // The returned entries are sorted.
+    assertEquals(retainedKeys, Arrays
+        .asList(new IndexKey("bar", ColumnIndexType.DICTIONARY), new 
IndexKey("bar", ColumnIndexType.FORWARD_INDEX),
+            new IndexKey("bar", ColumnIndexType.JSON_INDEX), new 
IndexKey("foo", ColumnIndexType.FORWARD_INDEX),
+            new IndexKey("foo", ColumnIndexType.INVERTED_INDEX)));
+  }
+
+  @Test
+  public void testPersistIndexMaps() {
+    ByteArrayOutputStream output = new ByteArrayOutputStream(1024 * 1024);
+    try (PrintWriter pw = new PrintWriter(output)) {
+      List<IndexEntry> entries = Arrays
+          .asList(new IndexEntry(new IndexKey("foo", 
ColumnIndexType.INVERTED_INDEX), 0, 1024),
+              new IndexEntry(new IndexKey("bar", 
ColumnIndexType.INVERTED_INDEX), 1024, 100),
+              new IndexEntry(new IndexKey("baz", 
ColumnIndexType.INVERTED_INDEX), 1124, 200));
+      SingleFileIndexDirectory.persistIndexMaps(entries, pw);
+    }
+    assertEquals(output.toString(), "foo.inverted_index.startOffset = 
0\nfoo.inverted_index.size = 1024\n"
+        + "bar.inverted_index.startOffset = 1024\nbar.inverted_index.size = 
100\n"
+        + "baz.inverted_index.startOffset = 1124\nbaz.inverted_index.size = 
200\n");
+  }
+
+  @Test
   public void testGetColumnIndices()
       throws Exception {
     try (SingleFileIndexDirectory spi = new SingleFileIndexDirectory(TEMP_DIR, 
_segmentMetadata, ReadMode.mmap)) {
@@ -180,8 +345,14 @@ public class SingleFileIndexDirectoryTest {
           new HashSet<>(Collections.singletonList("col2")));
       assertEquals(spi.getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX),
           new HashSet<>(Collections.singletonList("col4")));
-      // TODO: implement removeIndex and test it in next RP
-      // spi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX);
+
+      spi.removeIndex("col1", ColumnIndexType.FORWARD_INDEX);
+      spi.removeIndex("col2", ColumnIndexType.DICTIONARY);
+      spi.removeIndex("col111", ColumnIndexType.DICTIONARY);
+      assertEquals(spi.getColumnsWithIndex(ColumnIndexType.FORWARD_INDEX),
+          new HashSet<>(Collections.singletonList("col3")));
+      assertEquals(spi.getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX),
+          new HashSet<>(Collections.singletonList("col4")));
     }
   }
 }
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
index ad5a760..ae6c127 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
@@ -81,12 +81,6 @@ public abstract class ColumnIndexDirectory implements 
Closeable {
   public abstract void removeIndex(String columnName, ColumnIndexType 
indexType);
 
   /**
-   * Check if the implementation supports removing existing index
-   * @return true if the index removal is supported
-   */
-  public abstract boolean isIndexRemovalSupported();
-
-  /**
    * Get the columns with specific index type, loaded by column index 
directory.
    * @return a set of columns with such index type.
    */
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index b8fcc92..e10d864 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -181,12 +181,6 @@ public abstract class SegmentDirectory implements 
Closeable {
         throws IOException;
 
     /**
-     * Check if the removal of index is a supported operation
-     * @return true if the index removal is supported
-     */
-    public abstract boolean isIndexRemovalSupported();
-
-    /**
      * Removes an existing column index from directory
      * @param columnName column name
      * @param indexType column index type

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to