This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch nested-object-indexing-1
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 224299deccf308ddfdab2e59556d413944596329
Author: kishore gopalakrishna <g.kish...@gmail.com>
AuthorDate: Thu Jan 24 19:53:29 2019 -0800

    Wiring up end to end to support indexing nested fields on complex objects
---
 .../org/apache/pinot/common/data/FieldSpec.java    |   3 +
 .../pinot/common/data/objects/TextObject.java      |   3 +-
 pinot-core/pom.xml                                 |  19 ++-
 .../immutable/ImmutableSegmentLoader.java          |   4 +-
 .../io/reader/impl/v1/SortedIndexReaderImpl.java   |   6 +
 .../core/operator/filter/FilterOperatorUtils.java  |   8 +-
 .../filter/IndexBasedMatchesFilterOperator.java    |  86 +++++++++++
 .../filter/ScanBasedMatchesFilterOperator.java     |  33 ++--
 .../MatchesPredicateEvaluatorFactory.java          |  21 +--
 .../invertedindex/RealtimeInvertedIndexReader.java |   8 +-
 .../core/segment/creator/InvertedIndexCreator.java |   8 +
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../creator/impl/inv/LuceneIndexCreator.java       | 127 +++++++++++++++
 .../inv/OffHeapBitmapInvertedIndexCreator.java     |   6 +
 .../impl/inv/OnHeapBitmapInvertedIndexCreator.java |   6 +
 .../index/column/PhysicalColumnIndexContainer.java |  22 ++-
 .../index/data/source/ColumnDataSource.java        |   2 +-
 .../loader/invertedindex/InvertedIndexHandler.java | 106 ++++++++++++-
 .../index/readers/BitmapInvertedIndexReader.java   |   7 +
 .../segment/index/readers/InvertedIndexReader.java |   8 +
 .../index/readers/LuceneInvertedIndexReader.java   | 157 +++++++++++++++++++
 .../virtualcolumn/DocIdVirtualColumnProvider.java  |   7 +-
 .../SingleStringVirtualColumnProvider.java         |   7 +-
 .../tests/LuceneIndexClusterIntegrationTest.java   | 172 +++++++++++++++++++++
 pinot-perf/pom.xml                                 |  35 ++++-
 .../org/apache/pinot/perf/LuceneBenchmark.java     |  80 ++++++++++
 .../apache/pinot/tools/perf/ZookeeperLauncher.java |   2 +-
 27 files changed, 887 insertions(+), 57 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java 
b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
index 080f0e7..ad5a5a3 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
@@ -335,6 +335,9 @@ public abstract class FieldSpec implements 
Comparable<FieldSpec>, ConfigNodeLife
       case STRING:
         jsonSchema.set("type", convertStringsToJsonArray("null", "string"));
         return jsonSchema;
+      case BYTES:
+        jsonSchema.set("type", convertStringsToJsonArray("null", "bytes"));
+        return jsonSchema;
       default:
         throw new UnsupportedOperationException();
     }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
index cf5f27e..c171c40 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
@@ -27,7 +27,8 @@ import com.google.common.collect.Lists;
 public class TextObject implements PinotObject {
 
   byte[] _bytes;
-  private static List<String> _FIELDS = Lists.newArrayList("Content");
+  public static String DEFAULT_FIELD = "Content";
+  private static List<String> _FIELDS = Lists.newArrayList(DEFAULT_FIELD);
 
   @Override
   public void init(byte[] bytes) {
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 8f4efe2..4c8215a 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -196,7 +196,24 @@
         </exclusion>
       </exclusions>
     </dependency>
-
+    
+    <!-- Lucene -->
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-queryparser</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers-common</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    
     <!-- test -->
     <dependency>
       <groupId>org.mockito</groupId>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 2ddb80c..3d7ac31 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -118,8 +118,8 @@ public class ImmutableSegmentLoader {
     SegmentDirectory.Reader segmentReader = segmentDirectory.createReader();
     Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : 
segmentMetadata.getColumnMetadataMap().entrySet()) {
-      indexContainerMap
-          .put(entry.getKey(), new PhysicalColumnIndexContainer(segmentReader, 
entry.getValue(), indexLoadingConfig));
+      indexContainerMap.put(entry.getKey(),
+          new PhysicalColumnIndexContainer(indexDir, segmentReader, 
entry.getValue(), indexLoadingConfig));
     }
 
     if (schema == null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
index 0018b8c..a393dd4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.io.reader.impl.v1;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.ReaderContext;
 import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
@@ -120,6 +122,10 @@ public class SortedIndexReaderImpl extends 
BaseSingleColumnSingleValueReader<Sor
   }
 
   @Override
+  public IntPair getDocIds(Predicate predicate) {
+    throw new UnsupportedOperationException("");
+  }
+  @Override
   public Pairs.IntPair getDocIds(int dictId) {
     return new Pairs.IntPair(_reader.getInt(2 * dictId), _reader.getInt(2 * 
dictId + 1));
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index bc46b2f..f4354ae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -56,7 +56,13 @@ public class FilterOperatorUtils {
     // Use inverted index if the predicate type is not RANGE or REGEXP_LIKE 
for efficiency
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
     Predicate.Type predicateType = predicateEvaluator.getPredicateType();
-    if (dataSourceMetadata.hasInvertedIndex() && (predicateType != 
Predicate.Type.RANGE) && (predicateType
+    if(predicateType == Predicate.Type.MATCHES) {
+      if(dataSourceMetadata.hasInvertedIndex()) {
+        return new IndexBasedMatchesFilterOperator(predicateEvaluator, 
dataSource, startDocId, endDocId);
+      } else {
+        return new ScanBasedMatchesFilterOperator(predicateEvaluator, 
dataSource, startDocId, endDocId);
+      }
+    } else if (dataSourceMetadata.hasInvertedIndex() && (predicateType != 
Predicate.Type.RANGE) && (predicateType
         != Predicate.Type.REGEXP_LIKE)) {
       if (dataSourceMetadata.isSorted()) {
         return new SortedInvertedIndexBasedFilterOperator(predicateEvaluator, 
dataSource, startDocId, endDocId);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java
new file mode 100644
index 0000000..50f39b7
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.Preconditions;
+import org.apache.lucene.search.TopDocs;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.common.predicate.MatchesPredicate;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import 
org.apache.pinot.core.operator.filter.predicate.MatchesPredicateEvaluatorFactory.DefaultMatchesPredicateEvaluator;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndexBasedMatchesFilterOperator extends BaseFilterOperator {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(IndexBasedMatchesFilterOperator.class);
+  private static final String OPERATOR_NAME = 
"IndexBasedMatchesFilterOperator";
+
+  private final DataSource _dataSource;
+  private final int _startDocId;
+  // TODO: change it to exclusive
+  // Inclusive
+  private final int _endDocId;
+  private MatchesPredicate _matchesPredicate;
+
+  public IndexBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator,
+      DataSource dataSource, int startDocId, int endDocId) {
+    // NOTE:
+    // Predicate that is always evaluated as true or false should not be 
passed into the
+    // TextMatchFilterOperator for
+    // performance concern.
+    // If predicate is always evaluated as true, use MatchAllFilterOperator; 
if predicate is always
+    // evaluated as false,
+    // use EmptyFilterOperator.
+    Preconditions
+        .checkArgument(!predicateEvaluator.isAlwaysTrue() && 
!predicateEvaluator.isAlwaysFalse());
+    Preconditions.checkArgument(predicateEvaluator instanceof 
DefaultMatchesPredicateEvaluator);
+
+    DefaultMatchesPredicateEvaluator evaluator =
+        (DefaultMatchesPredicateEvaluator) predicateEvaluator;
+    _matchesPredicate = evaluator.getMatchesPredicate();
+    _dataSource = dataSource;
+    _startDocId = startDocId;
+    _endDocId = endDocId;
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+
+    InvertedIndexReader invertedIndex = _dataSource.getInvertedIndex();
+    MutableRoaringBitmap bitmap = (MutableRoaringBitmap) 
invertedIndex.getDocIds(_matchesPredicate);
+
+    boolean exclusive = false;
+    ImmutableRoaringBitmap[] bitmapArray = new ImmutableRoaringBitmap[] {
+        bitmap
+    };
+    return new FilterBlock(new BitmapDocIdSet(bitmapArray, _startDocId, 
_endDocId, exclusive));
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
similarity index 59%
copy from 
pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
copy to 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
index cf5f27e..1f730e1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java
@@ -16,36 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.data.objects;
+package org.apache.pinot.core.operator.filter;
 
-import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 
-import org.apache.pinot.common.data.PinotObject;
+public class ScanBasedMatchesFilterOperator  extends BaseFilterOperator {
 
-import com.google.common.collect.Lists;
-
-public class TextObject implements PinotObject {
-
-  byte[] _bytes;
-  private static List<String> _FIELDS = Lists.newArrayList("Content");
-
-  @Override
-  public void init(byte[] bytes) {
-    _bytes = bytes;
+  public ScanBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator,
+      DataSource dataSource, int startDocId, int endDocId) {
+    // TODO Auto-generated constructor stub
   }
 
   @Override
-  public byte[] toBytes() {
-    return _bytes;
-  }
-
-  @Override
-  public List<String> getPropertyNames() {
-    return _FIELDS;
+  protected FilterBlock getNextBlock() {
+    // TODO Auto-generated method stub
+    return null;
   }
 
   @Override
-  public Object getProperty(String field) {
+  public String getOperatorName() {
     // TODO Auto-generated method stub
     return null;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
index 26b7e16..684573f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java
@@ -34,17 +34,15 @@ public class MatchesPredicateEvaluatorFactory {
    */
   public static BaseRawValueBasedPredicateEvaluator newRawValueBasedEvaluator(
       MatchesPredicate textMatchPredicate, FieldSpec.DataType dataType) {
-    return new RawValueBasedTextMatchPredicateEvaluator(textMatchPredicate);
+    return new DefaultMatchesPredicateEvaluator(textMatchPredicate);
   }
 
-  public static final class RawValueBasedTextMatchPredicateEvaluator
+  public static final class DefaultMatchesPredicateEvaluator
       extends BaseRawValueBasedPredicateEvaluator {
-    String _query;
-    String _options;
+    private MatchesPredicate _matchesPredicate;
 
-    public RawValueBasedTextMatchPredicateEvaluator(MatchesPredicate 
textMatchPredicate) {
-      _query = textMatchPredicate.getQuery();
-      _options = textMatchPredicate.getQueryOptions();
+    public DefaultMatchesPredicateEvaluator(MatchesPredicate matchesPredicate) 
{
+      this._matchesPredicate = matchesPredicate;
     }
 
     @Override
@@ -58,12 +56,9 @@ public class MatchesPredicateEvaluatorFactory {
           "Text Match is not supported via scanning, its supported only via 
inverted index");
     }
 
-    public String getQueryString() {
-      return _query;
-    }
-
-    public String getQueryOptions() {
-      return _options;
+    public MatchesPredicate getMatchesPredicate(){
+      return _matchesPredicate;
     }
+   
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
index 363f688..4c45850 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.realtime.impl.invertedindex;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -54,7 +56,11 @@ public class RealtimeInvertedIndexReader implements 
InvertedIndexReader<MutableR
       _bitmaps.get(dictId).checkAndAdd(docId);
     }
   }
-
+  
+  @Override
+  public MutableRoaringBitmap getDocIds(Predicate predicate) {
+    throw new UnsupportedOperationException("Predicate:"+ predicate + " is not 
supported");
+  }
   @Override
   public MutableRoaringBitmap getDocIds(int dictId) {
     ThreadSafeMutableRoaringBitmap bitmap;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
index 3eafa9d..d5dadcf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.creator;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.pinot.common.data.PinotObject;
+
 
 /**
  * Currently only support RoaringBitmap inverted index.
@@ -65,6 +67,12 @@ public interface InvertedIndexCreator extends Closeable {
    * For multi-valued column, adds the dictionary Ids for the next document.
    */
   void add(int[] dictIds, int length);
+  
+  /**
+   * For complex data types such as Map, JSON, TEXT
+   * @param object
+   */
+  void add(PinotObject object);
 
   /**
    * Seals the index and flushes it to disk.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 3536018..7153f3a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -60,6 +60,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = 
".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = 
".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = 
".bitmap.inv";
+    public static final String LUCENE_INVERTED_INDEX_DIR = ".lucene.inv";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
   }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
new file mode 100644
index 0000000..6622b57
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.index.ColumnMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LuceneIndexCreator implements InvertedIndexCreator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InvertedIndexCreator.class);
+
+  public static final String VERSION = "7.6.0";
+  // Index file will be flushed after reaching this threshold
+  private static final int MAX_BUFFER_SIZE_MB = 500;
+  private static final Field.Store DEFAULT_STORE = Field.Store.NO;
+  private final Analyzer _analyzer;
+  private final IndexWriter _writer;
+  private final IndexWriterConfig _indexWriterConfig;
+  private final Directory _indexDirectory;
+  // TODO:Figure out a way to avoid this
+  boolean _isText = false;
+
+  public LuceneIndexCreator(ColumnMetadata columnMetadata, File 
outputDirectory) {
+    // TODO: Get IndexConfig and set the different analyzer for each field by 
default we set
+    // StandardAnalyzer and use TextField. This can be expensive and 
inefficient if all we need is
+    // exact match. See keyword analyzer
+    _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
+    _indexWriterConfig = new IndexWriterConfig(_analyzer);
+    _indexWriterConfig.setRAMBufferSizeMB(MAX_BUFFER_SIZE_MB);
+    _isText = "TEXT".equalsIgnoreCase(columnMetadata.getObjectType());
+    try {
+      _indexDirectory = FSDirectory.open(outputDirectory.toPath());
+      _writer = new IndexWriter(_indexDirectory, _indexWriterConfig);
+    } catch (IOException e) {
+      LOGGER.error("Encountered error creating LuceneIndexCreator ", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void add(int dictId) {
+    throw new UnsupportedOperationException(
+        "Lucene indexing not supported for dictionary encoded columns");
+  }
+
+  @Override
+  public void add(int[] dictIds, int length) {
+    throw new UnsupportedOperationException(
+        "Lucene indexing not supported for dictionary encoded columns");
+
+  }
+
+  @Override
+  public void add(PinotObject object) {
+    Document document = new Document();
+    List<String> propertyNames = object.getPropertyNames();
+    for (String propertyName : propertyNames) {
+      Field field;
+      // TODO: Figure out a way to avoid special casing Text, have a way to 
get propertyType from
+      // pinotObject?
+      // TODO: Handle list field
+      Object value = object.getProperty(propertyName);
+      if (value.getClass().isAssignableFrom(List.class)) {
+        List<?> list = (List<?>) value;
+        for (Object item : list) {
+          field = new TextField(propertyName, item.toString(), DEFAULT_STORE);
+          document.add(field);
+        }
+      } else {
+        field = new TextField(propertyName, value.toString(), DEFAULT_STORE);
+        document.add(field);
+      }
+    }
+    try {
+      _writer.addDocument(document);
+    } catch (IOException e) {
+      LOGGER.error("Encountered exception while adding doc:{}", 
document.toString(), e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void seal() throws IOException {
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    _writer.close();
+  }
+
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index c4e30b7..b525427 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -26,6 +26,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.PinotObject;
 import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -119,6 +120,11 @@ public final class OffHeapBitmapInvertedIndexCreator 
implements InvertedIndexCre
   }
 
   @Override
+  public void add(PinotObject object) {
+   throw new UnsupportedOperationException("Bitmap Indexing not supported for 
Pinot Objects");
+  }
+  
+  @Override
   public void add(int dictId) {
     putInt(_forwardIndexValueBuffer, _nextDocId++, dictId);
     putInt(_invertedIndexLengthBuffer, dictId, 
getInt(_invertedIndexLengthBuffer, dictId) + 1);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
index c358361..83bc69c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.PinotObject;
 import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -47,6 +48,11 @@ public final class OnHeapBitmapInvertedIndexCreator 
implements InvertedIndexCrea
   }
 
   @Override
+  public void add(PinotObject object) {
+   throw new UnsupportedOperationException("Bitmap Indexing not supported for 
Pinot Objects");
+  }
+  
+  @Override
   public void add(int dictId) {
     _bitmaps[dictId].add(_nextDocId++);
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index df83533..29b6b46 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.core.segment.index.column;
 
+import java.io.File;
 import java.io.IOException;
 import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
@@ -39,6 +41,7 @@ import 
org.apache.pinot.core.segment.index.readers.ImmutableDictionaryReader;
 import org.apache.pinot.core.segment.index.readers.IntDictionary;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader;
 import org.apache.pinot.core.segment.index.readers.OnHeapDoubleDictionary;
 import org.apache.pinot.core.segment.index.readers.OnHeapFloatDictionary;
 import org.apache.pinot.core.segment.index.readers.OnHeapIntDictionary;
@@ -60,7 +63,7 @@ public final class PhysicalColumnIndexContainer implements 
ColumnIndexContainer
   private final ImmutableDictionaryReader _dictionary;
   private final BloomFilterReader _bloomFilterReader;
 
-  public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, 
ColumnMetadata metadata,
+  public PhysicalColumnIndexContainer(File indexDir, SegmentDirectory.Reader 
segmentReader, ColumnMetadata metadata,
       IndexLoadingConfig indexLoadingConfig)
       throws IOException {
     String columnName = metadata.getColumnName();
@@ -105,16 +108,25 @@ public final class PhysicalColumnIndexContainer 
implements ColumnIndexContainer
                 metadata.getBitsPerElement());
       }
       if (loadInvertedIndex) {
-        _invertedIndex =
-            new 
BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, 
ColumnIndexType.INVERTED_INDEX),
-                metadata.getCardinality());
+        if (metadata.getObjectType() == null && 
metadata.getFieldSpec().getDataType() != DataType.BYTES) {
+          _invertedIndex =
+              new 
BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, 
ColumnIndexType.INVERTED_INDEX),
+                  metadata.getCardinality());
+        } else {
+          _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata);
+        }
       } else {
         _invertedIndex = null;
       }
     } else {
       // Raw index
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, 
metadata.getDataType());
-      _invertedIndex = null;
+      if (loadInvertedIndex && (metadata.getObjectType() != null
+          || metadata.getFieldSpec().getDataType() == DataType.BYTES)) {
+        _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata);
+      } else {
+        _invertedIndex = null;
+      }
       _dictionary = null;
       _bloomFilterReader = null;
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
index 3044457..2734f75 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java
@@ -87,7 +87,7 @@ public final class ColumnDataSource extends DataSource {
       }
     } else {
       // Raw index
-      Preconditions.checkState(invertedIndex == null);
+      //Preconditions.checkState(invertedIndex == null);
     }
 
     _operatorName = "ColumnDataSource [" + columnName + "]";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 15fa01d..b9a2bec 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -20,16 +20,24 @@ package 
org.apache.pinot.core.segment.index.loader.invertedindex;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.Set;
 import javax.annotation.Nonnull;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.PinotObject;
+import org.apache.pinot.common.data.objects.JSONObject;
+import org.apache.pinot.common.data.objects.MapObject;
+import org.apache.pinot.common.data.objects.TextObject;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
 import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
 import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.LuceneIndexCreator;
 import 
org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
@@ -70,11 +78,99 @@ public class InvertedIndexHandler {
   public void createInvertedIndices()
       throws IOException {
     for (ColumnMetadata columnMetadata : _invertedIndexColumns) {
-      createInvertedIndexForColumn(columnMetadata);
+      String objectType = columnMetadata.getObjectType();
+      if (objectType == null) {
+        createInvertedIndexForSimpleField(columnMetadata);
+      } else {
+        createInvertedIndexForComplexObject(columnMetadata);
+      }
     }
   }
 
-  private void createInvertedIndexForColumn(ColumnMetadata columnMetadata)
+  @SuppressWarnings("unchecked")
+  private void createInvertedIndexForComplexObject(ColumnMetadata 
columnMetadata)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + ".inv.inprogress");
+    File invertedIndexDir = new File(_indexDir, column + 
V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (invertedIndexDir.exists()) {
+        // Skip creating inverted index if already exists.
+        LOGGER.info("Found inverted index for segment: {}, column: {}", 
_segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+      // Remove inverted index if exists.
+      // For v1 and v2, it's the actual inverted index. For v3, it's the 
temporary inverted index.
+      FileUtils.deleteQuietly(invertedIndexDir);
+    }
+
+    // Create new inverted index for the column.
+    LOGGER.info("Creating new lucene based inverted index for segment: {}, 
column: {}", _segmentName, column);
+    int numDocs = columnMetadata.getTotalDocs();
+    String objectType = columnMetadata.getObjectType();
+    Class<? extends PinotObject> pinotObjectClazz;
+    PinotObject pinotObject = null;
+    try {
+      switch (objectType.toUpperCase()) {
+        case "MAP":
+          pinotObjectClazz = MapObject.class;
+          break;
+        case "JSON":
+          pinotObjectClazz = JSONObject.class;
+          break;
+        case "TEXT":
+          pinotObjectClazz = TextObject.class;
+          break;
+        default:
+          // custom object type.
+          pinotObjectClazz = (Class<? extends PinotObject>) 
Class.forName(objectType);
+      }
+      pinotObject = pinotObjectClazz.getConstructor(new 
Class[]{}).newInstance(new Object[]{});
+    } catch (Exception e) {
+      LOGGER.error("Error pinot object  for type:{}. Skipping inverted index 
creation", objectType);
+      return;
+    }
+
+    try (LuceneIndexCreator luceneIndexCreator = new 
LuceneIndexCreator(columnMetadata, invertedIndexDir)) {
+      try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, 
_segmentWriter)) {
+        if (columnMetadata.isSingleValue()) {
+          // Single-value column.
+          VarByteChunkSingleValueReader svFwdIndex = 
(VarByteChunkSingleValueReader) fwdIndex;
+          for (int i = 0; i < numDocs; i++) {
+            byte[] bytes = svFwdIndex.getBytes(i);
+
+            pinotObject.init(bytes);
+            luceneIndexCreator.add(pinotObject);
+          }
+        } else {
+          throw new UnsupportedOperationException("Multi Value not supported 
for complex object types");
+        }
+        luceneIndexCreator.seal();
+      }
+    }
+    String tarGzPath = 
TarGzCompressionUtils.createTarGzOfDirectory(invertedIndexDir.getAbsolutePath());
+
+    // For v3, write the generated inverted index file into the single file 
and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, new 
File(tarGzPath), ColumnIndexType.INVERTED_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created inverted index for segment: {}, column: {}", 
_segmentName, column);
+  }
+
+  private void createInvertedIndexForSimpleField(ColumnMetadata columnMetadata)
       throws IOException {
     String column = columnMetadata.getColumnName();
 
@@ -146,7 +242,11 @@ public class InvertedIndexHandler {
     int numRows = columnMetadata.getTotalDocs();
     int numBitsPerValue = columnMetadata.getBitsPerElement();
     if (columnMetadata.isSingleValue()) {
-      return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+      if (columnMetadata.hasDictionary()) {
+        return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+      } else {
+        return new VarByteChunkSingleValueReader(buffer);
+      }
     } else {
       return new FixedBitMultiValueReader(buffer, numRows, 
columnMetadata.getTotalNumberOfEntries(), numBitsPerValue);
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
index d68af16..d9d830a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
+
+import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.slf4j.Logger;
@@ -52,6 +54,11 @@ public class BitmapInvertedIndexReader implements 
InvertedIndexReader<ImmutableR
     load(indexDataBuffer);
   }
 
+  @Override
+  public ImmutableRoaringBitmap getDocIds(Predicate predicate) {
+    throw new UnsupportedOperationException("Predicate based evaluation not 
supported for Bitmap based Indexing scheme");
+  }
+  
   /**
    * {@inheritDoc}
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
index 6a949c0..6b42f4c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java
@@ -20,6 +20,8 @@ package org.apache.pinot.core.segment.index.readers;
 
 import java.io.Closeable;
 
+import org.apache.pinot.core.common.Predicate;
+
 
 public interface InvertedIndexReader<T> extends Closeable {
 
@@ -27,4 +29,10 @@ public interface InvertedIndexReader<T> extends Closeable {
    * Get the document ids for the given dictionary id.
    */
   T getDocIds(int dictId);
+  
+  /**
+   * Get the document id's for the given predicate
+   */
+  T getDocIds(Predicate predicate);
+  
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
new file mode 100644
index 0000000..576bcc4
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.queryparser.flexible.core.nodes.FieldQueryNode;
+import org.apache.lucene.queryparser.surround.query.FieldsQuery;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.LeafCollector;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.FieldSpec.FieldType;
+import org.apache.pinot.common.data.objects.TextObject;
+import org.apache.pinot.core.common.Predicate;
+import org.apache.pinot.core.common.predicate.MatchesPredicate;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.ColumnMetadata;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class LuceneInvertedIndexReader implements 
InvertedIndexReader<MutableRoaringBitmap> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LuceneInvertedIndexReader.class);
+  private final IndexSearcher _searcher;
+  private final Analyzer _analyzer = new PerFieldAnalyzerWrapper(new 
StandardAnalyzer());
+  private Directory _index;
+
+  /**
+   * TODO: Change this to take PinotDataBuffer, work around for now since 
Lucene needs actual
+   * directory
+   * @param metadata
+   * @param indexDir
+   * @param metadata
+   */
+  public LuceneInvertedIndexReader(File segmentIndexDir, ColumnMetadata 
metadata) {
+
+    try {
+      File searchIndexDir = new File(segmentIndexDir.getPath(),
+          metadata.getColumnName() + 
V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR);
+      _index = FSDirectory.open(searchIndexDir.toPath());
+      IndexReader reader = DirectoryReader.open(_index);
+      _searcher = new IndexSearcher(reader);
+    } catch (IOException e) {
+      LOGGER.error("Encountered error creating LuceneSearchIndexReader ", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    _index.close();
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(int dictId) {
+    throw new UnsupportedOperationException(
+        "DictId based evaluation not supported for Lucene based Indexing 
scheme");
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(Predicate predicate) {
+    MatchesPredicate matchesPredicate = (MatchesPredicate) predicate;
+    MutableRoaringBitmap bitmap =
+        getDocIds(matchesPredicate.getQuery(), 
matchesPredicate.getQueryOptions());
+    return bitmap;
+  }
+
+  public MutableRoaringBitmap getDocIds(String queryStr, String options) {
+    QueryParser queryParser = new QueryParser(TextObject.DEFAULT_FIELD, 
_analyzer);
+    Query query = null;
+    try {
+      query = queryParser.parse(queryStr);
+    } catch (ParseException e) {
+      LOGGER.error("Encountered exception while parsing query {}", queryStr, 
e);
+      throw new RuntimeException(e);
+    }
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    try {
+      Collector collector = createCollector(bitmap);
+      _searcher.search(query, collector);
+    } catch (IOException e) {
+      LOGGER.error("Encountered exception while executing search query {}", 
queryStr, e);
+      throw new RuntimeException(e);
+    }
+    return bitmap;
+  }
+
+  private Collector createCollector(final MutableRoaringBitmap bitmap) {
+    return new LuceneResultCollector(bitmap);
+  }
+
+  public static final class LuceneResultCollector implements Collector {
+    private final MutableRoaringBitmap bitmap;
+
+    public LuceneResultCollector(MutableRoaringBitmap bitmap) {
+      this.bitmap = bitmap;
+    }
+
+    @Override
+    public boolean needsScores() {
+      return false;
+    }
+
+    @Override
+    public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+      return new LeafCollector() {
+
+        @Override
+        public void setScorer(Scorer scorer) throws IOException {
+          // ignore
+        }
+
+        @Override
+        public void collect(int doc) throws IOException {
+          bitmap.add(doc);
+        }
+      };
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
index 22ce9ad..9662d0e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn;
 import java.io.IOException;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.reader.impl.ChunkReaderContext;
@@ -108,7 +110,10 @@ public class DocIdVirtualColumnProvider extends 
BaseVirtualColumnProvider {
     public Pairs.IntPair getDocIds(int dictId) {
       return new Pairs.IntPair(dictId, dictId);
     }
-
+    @Override
+    public IntPair getDocIds(Predicate predicate) {
+      throw new UnsupportedOperationException("");
+    }
     @Override
     public void close()
         throws IOException {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
index 6082306..8672fed 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn;
 import java.io.IOException;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.common.utils.Pairs.IntPair;
+import org.apache.pinot.core.common.Predicate;
 import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
 import org.apache.pinot.core.io.reader.DataFileReader;
 import org.apache.pinot.core.io.reader.impl.v1.SortedIndexReader;
@@ -75,7 +77,10 @@ public abstract class SingleStringVirtualColumnProvider 
extends BaseVirtualColum
     public SingleStringInvertedIndex(int length) {
       _length = length;
     }
-
+    @Override
+    public IntPair getDocIds(Predicate predicate) {
+      throw new UnsupportedOperationException("");
+    }
     @Override
     public Pairs.IntPair getDocIds(int dictId) {
       if (dictId == 0) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java
new file mode 100644
index 0000000..ab4d561
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.tools.data.generator.AvroWriter;
+import org.apache.pinot.tools.query.comparison.StarTreeQueryGenerator;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
+public class LuceneIndexClusterIntegrationTest extends 
BaseClusterIntegrationTest {
+  protected static final String DEFAULT_TABLE_NAME = "myTable";
+  static final long TOTAL_DOCS = 1_000L;
+
+  protected Schema _schema;
+  private StarTreeQueryGenerator _queryGenerator;
+  private String _currentTable;
+
+  @Nonnull
+  @Override
+  protected String getTableName() {
+    return _currentTable;
+  }
+
+  @Nonnull
+  @Override
+  protected String getSchemaFileName() {
+    return null;
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServers(1);
+
+    _schema = new Schema();
+    FieldSpec jsonFieldSpec = new DimensionFieldSpec();
+    jsonFieldSpec.setDataType(DataType.BYTES);
+    jsonFieldSpec.setDefaultNullValue("{}".getBytes());
+    jsonFieldSpec.setObjectType("JSON");
+    jsonFieldSpec.setName("headers");
+    jsonFieldSpec.setSingleValueField(true);
+    _schema.addField(jsonFieldSpec);
+
+    // Create the tables
+    ArrayList<String> invertedIndexColumns = Lists.newArrayList("headers");
+    addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null, 
SegmentVersion.v1,
+        invertedIndexColumns, null, null);
+
+    setUpSegmentsAndQueryGenerator();
+
+    // Wait for all documents loaded
+    _currentTable = DEFAULT_TABLE_NAME;
+    waitForAllDocsLoaded(10_000);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return TOTAL_DOCS;
+  }
+  
+  protected void setUpSegmentsAndQueryGenerator() throws Exception {
+    org.apache.avro.Schema avroSchema = AvroWriter.getAvroSchema(_schema);
+    DataFileWriter recordWriter =
+        new DataFileWriter<>(new 
GenericDatumWriter<GenericData.Record>(avroSchema));
+    String parent = "/tmp/luceneTest";
+    File avroFile = new File(parent, "part-" + 0 + ".avro");
+    avroFile.getParentFile().mkdirs();
+    recordWriter.create(avroSchema, avroFile);
+    ObjectMapper mapper = new ObjectMapper();
+    for (int i = 0; i < TOTAL_DOCS; i++) {
+      ObjectNode objectNode = JsonNodeFactory.instance.objectNode();
+      objectNode.put("k1", "value" + i);
+      objectNode.put("k2", "value" + i);
+      String json = mapper.writeValueAsString(objectNode);
+      GenericData.Record record = new GenericData.Record(avroSchema);
+      ByteBuffer byteBuffer = ByteBuffer.wrap(json.getBytes());
+      record.put("headers", byteBuffer);
+      recordWriter.append(record);
+    }
+    recordWriter.close();
+
+    // Unpack the Avro files
+    List<File> avroFiles = Lists.newArrayList(avroFile);
+
+    // Create and upload segments without star tree indexes from Avro data
+    createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
+
+  }
+
+  private void createAndUploadSegments(List<File> avroFiles, String tableName,
+      boolean createStarTreeIndex) throws Exception {
+    System.out.println("SEGMENT_DIR" + _segmentDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+
+    ExecutorService executor = Executors.newCachedThreadPool();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, 
_segmentDir, _tarDir, tableName,
+        createStarTreeIndex, null, null, _schema, executor);
+    executor.shutdown();
+    executor.awaitTermination(10, TimeUnit.MINUTES);
+
+    uploadSegments(_tarDir);
+  }
+
+  @Test
+  public void testQueries() throws Exception {
+    String pqlQuery =
+        "Select count(*) from " + DEFAULT_TABLE_NAME + " WHERE headers 
matches('k1:\"value\\-1\"','')";
+    JsonNode pinotResponse = postQuery(pqlQuery);
+    System.out.println(pinotResponse);
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    dropOfflineTable(DEFAULT_TABLE_NAME);
+
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+}
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 68393be..dd795c1 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -19,7 +19,8 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
   <parent>
@@ -101,6 +102,25 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>net.sf.jopt-simple</groupId>
+      <artifactId>jopt-simple</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-core</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-queryparser</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.lucene</groupId>
+      <artifactId>lucene-analyzers-common</artifactId>
+      <version>7.6.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.openjdk.jmh</groupId>
       <artifactId>jmh-generator-annprocess</artifactId>
       <version>1.15</version>
@@ -180,15 +200,20 @@
           <binFileExtensions>
             <unix>.sh</unix>
           </binFileExtensions>
-          <!-- Set the target configuration directory to be used in the bin 
scripts -->
+          <!-- Set the target configuration directory to be used in the bin 
+            scripts -->
           <configurationDirectory>conf</configurationDirectory>
-          <!-- Copy the contents from "/src/main/config" to the target 
configuration directory in the assembled application -->
+          <!-- Copy the contents from "/src/main/config" to the target 
configuration 
+            directory in the assembled application -->
           <copyConfigurationDirectory>false</copyConfigurationDirectory>
-          <!-- Include the target configuration directory in the beginning of 
the classpath declaration in the bin scripts -->
+          <!-- Include the target configuration directory in the beginning 
+            of the classpath declaration in the bin scripts -->
           
<includeConfigurationDirectoryInClasspath>false</includeConfigurationDirectoryInClasspath>
           
<assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory>
           <!-- Extra JVM arguments that will be included in the bin scripts -->
-          <extraJvmArguments>-Xms24G -Xmx24G 
-Dlog4j.configuration=log4j.properties</extraJvmArguments>
+          <extraJvmArguments>-Xms24G -Xmx24G
+            -Dlog4j.configuration=log4j.properties
+          </extraJvmArguments>
           <!-- Generate bin scripts for windows and unix pr default -->
           <platforms>
             <platform>unix</platform>
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java
new file mode 100644
index 0000000..2af2ed5
--- /dev/null
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.Collector;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+public class LuceneBenchmark {
+
+  public static void main(String[] args) throws IOException, ParseException {
+    Map<String, Analyzer> analyzerMap = new HashMap<>();
+    Analyzer wrapper = new PerFieldAnalyzerWrapper(new StandardAnalyzer());
+    Directory index = new RAMDirectory();
+    IndexWriterConfig config = new IndexWriterConfig(wrapper);
+
+    try (IndexWriter writer = new IndexWriter(index, config)) {
+      for (int i = 0; i < 1000; i++) {
+        Document doc = new Document();
+        doc.add(new TextField("k1", "value-" + i, Field.Store.YES));
+        doc.add(new TextField("k2", "value-" + i, Field.Store.YES));
+        writer.addDocument(doc);
+      }
+    }
+    String querystr;
+    querystr = "k1:\"value1?0\"";
+    Query q = new QueryParser("Content", wrapper).parse(querystr);
+    q = new WildcardQuery(new Term("k1", QueryParser.escape("value1*")));
+
+    // 3. searching
+    IndexReader reader = DirectoryReader.open(index);
+    IndexSearcher searcher = new IndexSearcher(reader);
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    Collector collector = new 
LuceneInvertedIndexReader.LuceneResultCollector(bitmap);
+    searcher.search(q, collector);
+
+    // 4. display results
+    System.out.println("Query string: " + querystr);
+    System.out.println("Found " + bitmap.getCardinality() + " hits.");
+
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
index 838b6d1..c299257 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java
@@ -36,7 +36,7 @@ public class ZookeeperLauncher {
   private ZkServer _zkServer;
 
   public ZookeeperLauncher() {
-    this("/tmp");
+    this(org.apache.commons.io.FileUtils.getTempDirectoryPath());
   }
 
   public ZookeeperLauncher(String baseTempDir) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to