This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch nested-object-indexing-1 in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 224299deccf308ddfdab2e59556d413944596329 Author: kishore gopalakrishna <g.kish...@gmail.com> AuthorDate: Thu Jan 24 19:53:29 2019 -0800 Wiring up end to end to support indexing nested fields on complex objects --- .../org/apache/pinot/common/data/FieldSpec.java | 3 + .../pinot/common/data/objects/TextObject.java | 3 +- pinot-core/pom.xml | 19 ++- .../immutable/ImmutableSegmentLoader.java | 4 +- .../io/reader/impl/v1/SortedIndexReaderImpl.java | 6 + .../core/operator/filter/FilterOperatorUtils.java | 8 +- .../filter/IndexBasedMatchesFilterOperator.java | 86 +++++++++++ .../filter/ScanBasedMatchesFilterOperator.java | 33 ++-- .../MatchesPredicateEvaluatorFactory.java | 21 +-- .../invertedindex/RealtimeInvertedIndexReader.java | 8 +- .../core/segment/creator/InvertedIndexCreator.java | 8 + .../core/segment/creator/impl/V1Constants.java | 1 + .../creator/impl/inv/LuceneIndexCreator.java | 127 +++++++++++++++ .../inv/OffHeapBitmapInvertedIndexCreator.java | 6 + .../impl/inv/OnHeapBitmapInvertedIndexCreator.java | 6 + .../index/column/PhysicalColumnIndexContainer.java | 22 ++- .../index/data/source/ColumnDataSource.java | 2 +- .../loader/invertedindex/InvertedIndexHandler.java | 106 ++++++++++++- .../index/readers/BitmapInvertedIndexReader.java | 7 + .../segment/index/readers/InvertedIndexReader.java | 8 + .../index/readers/LuceneInvertedIndexReader.java | 157 +++++++++++++++++++ .../virtualcolumn/DocIdVirtualColumnProvider.java | 7 +- .../SingleStringVirtualColumnProvider.java | 7 +- .../tests/LuceneIndexClusterIntegrationTest.java | 172 +++++++++++++++++++++ pinot-perf/pom.xml | 35 ++++- .../org/apache/pinot/perf/LuceneBenchmark.java | 80 ++++++++++ .../apache/pinot/tools/perf/ZookeeperLauncher.java | 2 +- 27 files changed, 887 insertions(+), 57 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java index 080f0e7..ad5a5a3 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java @@ -335,6 +335,9 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife case STRING: jsonSchema.set("type", convertStringsToJsonArray("null", "string")); return jsonSchema; + case BYTES: + jsonSchema.set("type", convertStringsToJsonArray("null", "bytes")); + return jsonSchema; default: throw new UnsupportedOperationException(); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java index cf5f27e..c171c40 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java @@ -27,7 +27,8 @@ import com.google.common.collect.Lists; public class TextObject implements PinotObject { byte[] _bytes; - private static List<String> _FIELDS = Lists.newArrayList("Content"); + public static String DEFAULT_FIELD = "Content"; + private static List<String> _FIELDS = Lists.newArrayList(DEFAULT_FIELD); @Override public void init(byte[] bytes) { diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml index 8f4efe2..4c8215a 100644 --- a/pinot-core/pom.xml +++ b/pinot-core/pom.xml @@ -196,7 +196,24 @@ </exclusion> </exclusions> </dependency> - + + <!-- Lucene --> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>7.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-queryparser</artifactId> + <version>7.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers-common</artifactId> + <version>7.6.0</version> + </dependency> + <!-- test --> <dependency> <groupId>org.mockito</groupId> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java index 2ddb80c..3d7ac31 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java @@ -118,8 +118,8 @@ public class ImmutableSegmentLoader { SegmentDirectory.Reader segmentReader = segmentDirectory.createReader(); Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>(); for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) { - indexContainerMap - .put(entry.getKey(), new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig)); + indexContainerMap.put(entry.getKey(), + new PhysicalColumnIndexContainer(indexDir, segmentReader, entry.getValue(), indexLoadingConfig)); } if (schema == null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java index 0018b8c..a393dd4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/io/reader/impl/v1/SortedIndexReaderImpl.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.io.reader.impl.v1; import com.google.common.base.Preconditions; import java.io.IOException; import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.common.utils.Pairs.IntPair; +import org.apache.pinot.core.common.Predicate; import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.ReaderContext; import org.apache.pinot.core.io.util.FixedByteValueReaderWriter; @@ -120,6 +122,10 @@ public class SortedIndexReaderImpl extends BaseSingleColumnSingleValueReader<Sor } @Override + public IntPair getDocIds(Predicate predicate) { + throw new UnsupportedOperationException(""); + } + @Override public Pairs.IntPair getDocIds(int dictId) { return new Pairs.IntPair(_reader.getInt(2 * dictId), _reader.getInt(2 * dictId + 1)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java index bc46b2f..f4354ae 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java @@ -56,7 +56,13 @@ public class FilterOperatorUtils { // Use inverted index if the predicate type is not RANGE or REGEXP_LIKE for efficiency DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); Predicate.Type predicateType = predicateEvaluator.getPredicateType(); - if (dataSourceMetadata.hasInvertedIndex() && (predicateType != Predicate.Type.RANGE) && (predicateType + if(predicateType == Predicate.Type.MATCHES) { + if(dataSourceMetadata.hasInvertedIndex()) { + return new IndexBasedMatchesFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId); + } else { + return new ScanBasedMatchesFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId); + } + } else if (dataSourceMetadata.hasInvertedIndex() && (predicateType != Predicate.Type.RANGE) && (predicateType != Predicate.Type.REGEXP_LIKE)) { if (dataSourceMetadata.isSorted()) { return new SortedInvertedIndexBasedFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java new file mode 100644 index 0000000..50f39b7 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/IndexBasedMatchesFilterOperator.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.google.common.base.Preconditions; +import org.apache.lucene.search.TopDocs; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.common.predicate.MatchesPredicate; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.filter.predicate.MatchesPredicateEvaluatorFactory.DefaultMatchesPredicateEvaluator; +import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IndexBasedMatchesFilterOperator extends BaseFilterOperator { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IndexBasedMatchesFilterOperator.class); + private static final String OPERATOR_NAME = "IndexBasedMatchesFilterOperator"; + + private final DataSource _dataSource; + private final int _startDocId; + // TODO: change it to exclusive + // Inclusive + private final int _endDocId; + private MatchesPredicate _matchesPredicate; + + public IndexBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator, + DataSource dataSource, int startDocId, int endDocId) { + // NOTE: + // Predicate that is always evaluated as true or false should not be passed into the + // TextMatchFilterOperator for + // performance concern. + // If predicate is always evaluated as true, use MatchAllFilterOperator; if predicate is always + // evaluated as false, + // use EmptyFilterOperator. + Preconditions + .checkArgument(!predicateEvaluator.isAlwaysTrue() && !predicateEvaluator.isAlwaysFalse()); + Preconditions.checkArgument(predicateEvaluator instanceof DefaultMatchesPredicateEvaluator); + + DefaultMatchesPredicateEvaluator evaluator = + (DefaultMatchesPredicateEvaluator) predicateEvaluator; + _matchesPredicate = evaluator.getMatchesPredicate(); + _dataSource = dataSource; + _startDocId = startDocId; + _endDocId = endDocId; + } + + @Override + protected FilterBlock getNextBlock() { + + InvertedIndexReader invertedIndex = _dataSource.getInvertedIndex(); + MutableRoaringBitmap bitmap = (MutableRoaringBitmap) invertedIndex.getDocIds(_matchesPredicate); + + boolean exclusive = false; + ImmutableRoaringBitmap[] bitmapArray = new ImmutableRoaringBitmap[] { + bitmap + }; + return new FilterBlock(new BitmapDocIdSet(bitmapArray, _startDocId, _endDocId, exclusive)); + } + + @Override + public String getOperatorName() { + return OPERATOR_NAME; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java similarity index 59% copy from pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java copy to pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java index cf5f27e..1f730e1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/data/objects/TextObject.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedMatchesFilterOperator.java @@ -16,36 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.common.data.objects; +package org.apache.pinot.core.operator.filter; -import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; -import org.apache.pinot.common.data.PinotObject; +public class ScanBasedMatchesFilterOperator extends BaseFilterOperator { -import com.google.common.collect.Lists; - -public class TextObject implements PinotObject { - - byte[] _bytes; - private static List<String> _FIELDS = Lists.newArrayList("Content"); - - @Override - public void init(byte[] bytes) { - _bytes = bytes; + public ScanBasedMatchesFilterOperator(PredicateEvaluator predicateEvaluator, + DataSource dataSource, int startDocId, int endDocId) { + // TODO Auto-generated constructor stub } @Override - public byte[] toBytes() { - return _bytes; - } - - @Override - public List<String> getPropertyNames() { - return _FIELDS; + protected FilterBlock getNextBlock() { + // TODO Auto-generated method stub + return null; } @Override - public Object getProperty(String field) { + public String getOperatorName() { // TODO Auto-generated method stub return null; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java index 26b7e16..684573f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/MatchesPredicateEvaluatorFactory.java @@ -34,17 +34,15 @@ public class MatchesPredicateEvaluatorFactory { */ public static BaseRawValueBasedPredicateEvaluator newRawValueBasedEvaluator( MatchesPredicate textMatchPredicate, FieldSpec.DataType dataType) { - return new RawValueBasedTextMatchPredicateEvaluator(textMatchPredicate); + return new DefaultMatchesPredicateEvaluator(textMatchPredicate); } - public static final class RawValueBasedTextMatchPredicateEvaluator + public static final class DefaultMatchesPredicateEvaluator extends BaseRawValueBasedPredicateEvaluator { - String _query; - String _options; + private MatchesPredicate _matchesPredicate; - public RawValueBasedTextMatchPredicateEvaluator(MatchesPredicate textMatchPredicate) { - _query = textMatchPredicate.getQuery(); - _options = textMatchPredicate.getQueryOptions(); + public DefaultMatchesPredicateEvaluator(MatchesPredicate matchesPredicate) { + this._matchesPredicate = matchesPredicate; } @Override @@ -58,12 +56,9 @@ public class MatchesPredicateEvaluatorFactory { "Text Match is not supported via scanning, its supported only via inverted index"); } - public String getQueryString() { - return _query; - } - - public String getQueryOptions() { - return _options; + public MatchesPredicate getMatchesPredicate(){ + return _matchesPredicate; } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java index 363f688..4c45850 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeInvertedIndexReader.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.realtime.impl.invertedindex; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.pinot.core.common.Predicate; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -54,7 +56,11 @@ public class RealtimeInvertedIndexReader implements InvertedIndexReader<MutableR _bitmaps.get(dictId).checkAndAdd(docId); } } - + + @Override + public MutableRoaringBitmap getDocIds(Predicate predicate) { + throw new UnsupportedOperationException("Predicate:"+ predicate + " is not supported"); + } @Override public MutableRoaringBitmap getDocIds(int dictId) { ThreadSafeMutableRoaringBitmap bitmap; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java index 3eafa9d..d5dadcf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.creator; import java.io.Closeable; import java.io.IOException; +import org.apache.pinot.common.data.PinotObject; + /** * Currently only support RoaringBitmap inverted index. @@ -65,6 +67,12 @@ public interface InvertedIndexCreator extends Closeable { * For multi-valued column, adds the dictionary Ids for the next document. */ void add(int[] dictIds, int length); + + /** + * For complex data types such as Map, JSON, TEXT + * @param object + */ + void add(PinotObject object); /** * Seals the index and flushes it to disk. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java index 3536018..7153f3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java @@ -60,6 +60,7 @@ public class V1Constants { public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd"; public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd"; public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv"; + public static final String LUCENE_INVERTED_INDEX_DIR = ".lucene.inv"; public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom"; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java new file mode 100644 index 0000000..6622b57 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/LuceneIndexCreator.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator.impl.inv; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.PinotObject; +import org.apache.pinot.core.segment.creator.InvertedIndexCreator; +import org.apache.pinot.core.segment.index.ColumnMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LuceneIndexCreator implements InvertedIndexCreator { + + private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexCreator.class); + + public static final String VERSION = "7.6.0"; + // Index file will be flushed after reaching this threshold + private static final int MAX_BUFFER_SIZE_MB = 500; + private static final Field.Store DEFAULT_STORE = Field.Store.NO; + private final Analyzer _analyzer; + private final IndexWriter _writer; + private final IndexWriterConfig _indexWriterConfig; + private final Directory _indexDirectory; + // TODO:Figure out a way to avoid this + boolean _isText = false; + + public LuceneIndexCreator(ColumnMetadata columnMetadata, File outputDirectory) { + // TODO: Get IndexConfig and set the different analyzer for each field by default we set + // StandardAnalyzer and use TextField. This can be expensive and inefficient if all we need is + // exact match. See keyword analyzer + _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer()); + _indexWriterConfig = new IndexWriterConfig(_analyzer); + _indexWriterConfig.setRAMBufferSizeMB(MAX_BUFFER_SIZE_MB); + _isText = "TEXT".equalsIgnoreCase(columnMetadata.getObjectType()); + try { + _indexDirectory = FSDirectory.open(outputDirectory.toPath()); + _writer = new IndexWriter(_indexDirectory, _indexWriterConfig); + } catch (IOException e) { + LOGGER.error("Encountered error creating LuceneIndexCreator ", e); + throw new RuntimeException(e); + } + } + + @Override + public void add(int dictId) { + throw new UnsupportedOperationException( + "Lucene indexing not supported for dictionary encoded columns"); + } + + @Override + public void add(int[] dictIds, int length) { + throw new UnsupportedOperationException( + "Lucene indexing not supported for dictionary encoded columns"); + + } + + @Override + public void add(PinotObject object) { + Document document = new Document(); + List<String> propertyNames = object.getPropertyNames(); + for (String propertyName : propertyNames) { + Field field; + // TODO: Figure out a way to avoid special casing Text, have a way to get propertyType from + // pinotObject? + // TODO: Handle list field + Object value = object.getProperty(propertyName); + if (value.getClass().isAssignableFrom(List.class)) { + List<?> list = (List<?>) value; + for (Object item : list) { + field = new TextField(propertyName, item.toString(), DEFAULT_STORE); + document.add(field); + } + } else { + field = new TextField(propertyName, value.toString(), DEFAULT_STORE); + document.add(field); + } + } + try { + _writer.addDocument(document); + } catch (IOException e) { + LOGGER.error("Encountered exception while adding doc:{}", document.toString(), e); + throw new RuntimeException(e); + } + } + + @Override + public void seal() throws IOException { + + } + + @Override + public void close() throws IOException { + _writer.close(); + } + +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java index c4e30b7..b525427 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java @@ -26,6 +26,7 @@ import java.io.FileOutputStream; import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.PinotObject; import org.apache.pinot.core.segment.creator.InvertedIndexCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.memory.PinotDataBuffer; @@ -119,6 +120,11 @@ public final class OffHeapBitmapInvertedIndexCreator implements InvertedIndexCre } @Override + public void add(PinotObject object) { + throw new UnsupportedOperationException("Bitmap Indexing not supported for Pinot Objects"); + } + + @Override public void add(int dictId) { putInt(_forwardIndexValueBuffer, _nextDocId++, dictId); putInt(_invertedIndexLengthBuffer, dictId, getInt(_invertedIndexLengthBuffer, dictId) + 1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java index c358361..83bc69c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.data.PinotObject; import org.apache.pinot.core.segment.creator.InvertedIndexCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -47,6 +48,11 @@ public final class OnHeapBitmapInvertedIndexCreator implements InvertedIndexCrea } @Override + public void add(PinotObject object) { + throw new UnsupportedOperationException("Bitmap Indexing not supported for Pinot Objects"); + } + + @Override public void add(int dictId) { _bitmaps[dictId].add(_nextDocId++); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java index df83533..29b6b46 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java @@ -18,8 +18,10 @@ */ package org.apache.pinot.core.segment.index.column; +import java.io.File; import java.io.IOException; import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.FieldSpec.DataType; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader; @@ -39,6 +41,7 @@ import org.apache.pinot.core.segment.index.readers.ImmutableDictionaryReader; import org.apache.pinot.core.segment.index.readers.IntDictionary; import org.apache.pinot.core.segment.index.readers.InvertedIndexReader; import org.apache.pinot.core.segment.index.readers.LongDictionary; +import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader; import org.apache.pinot.core.segment.index.readers.OnHeapDoubleDictionary; import org.apache.pinot.core.segment.index.readers.OnHeapFloatDictionary; import org.apache.pinot.core.segment.index.readers.OnHeapIntDictionary; @@ -60,7 +63,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final ImmutableDictionaryReader _dictionary; private final BloomFilterReader _bloomFilterReader; - public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata, + public PhysicalColumnIndexContainer(File indexDir, SegmentDirectory.Reader segmentReader, ColumnMetadata metadata, IndexLoadingConfig indexLoadingConfig) throws IOException { String columnName = metadata.getColumnName(); @@ -105,16 +108,25 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer metadata.getBitsPerElement()); } if (loadInvertedIndex) { - _invertedIndex = - new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), - metadata.getCardinality()); + if (metadata.getObjectType() == null && metadata.getFieldSpec().getDataType() != DataType.BYTES) { + _invertedIndex = + new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), + metadata.getCardinality()); + } else { + _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata); + } } else { _invertedIndex = null; } } else { // Raw index _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType()); - _invertedIndex = null; + if (loadInvertedIndex && (metadata.getObjectType() != null + || metadata.getFieldSpec().getDataType() == DataType.BYTES)) { + _invertedIndex = new LuceneInvertedIndexReader(indexDir, metadata); + } else { + _invertedIndex = null; + } _dictionary = null; _bloomFilterReader = null; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java index 3044457..2734f75 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/data/source/ColumnDataSource.java @@ -87,7 +87,7 @@ public final class ColumnDataSource extends DataSource { } } else { // Raw index - Preconditions.checkState(invertedIndex == null); + //Preconditions.checkState(invertedIndex == null); } _operatorName = "ColumnDataSource [" + columnName + "]"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java index 15fa01d..b9a2bec 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/InvertedIndexHandler.java @@ -20,16 +20,24 @@ package org.apache.pinot.core.segment.index.loader.invertedindex; import java.io.File; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.HashSet; import java.util.Set; import javax.annotation.Nonnull; import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.data.PinotObject; +import org.apache.pinot.common.data.objects.JSONObject; +import org.apache.pinot.common.data.objects.MapObject; +import org.apache.pinot.common.data.objects.TextObject; +import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader; import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader; import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader; +import org.apache.pinot.core.io.reader.impl.v1.VarByteChunkSingleValueReader; import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.creator.impl.inv.LuceneIndexCreator; import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; import org.apache.pinot.core.segment.index.ColumnMetadata; import org.apache.pinot.core.segment.index.SegmentMetadataImpl; @@ -70,11 +78,99 @@ public class InvertedIndexHandler { public void createInvertedIndices() throws IOException { for (ColumnMetadata columnMetadata : _invertedIndexColumns) { - createInvertedIndexForColumn(columnMetadata); + String objectType = columnMetadata.getObjectType(); + if (objectType == null) { + createInvertedIndexForSimpleField(columnMetadata); + } else { + createInvertedIndexForComplexObject(columnMetadata); + } } } - private void createInvertedIndexForColumn(ColumnMetadata columnMetadata) + @SuppressWarnings("unchecked") + private void createInvertedIndexForComplexObject(ColumnMetadata columnMetadata) + throws IOException { + String column = columnMetadata.getColumnName(); + + File inProgress = new File(_indexDir, column + ".inv.inprogress"); + File invertedIndexDir = new File(_indexDir, column + V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + + if (invertedIndexDir.exists()) { + // Skip creating inverted index if already exists. + LOGGER.info("Found inverted index for segment: {}, column: {}", _segmentName, column); + return; + } + + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run gets interrupted. + // Remove inverted index if exists. + // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index. + FileUtils.deleteQuietly(invertedIndexDir); + } + + // Create new inverted index for the column. + LOGGER.info("Creating new lucene based inverted index for segment: {}, column: {}", _segmentName, column); + int numDocs = columnMetadata.getTotalDocs(); + String objectType = columnMetadata.getObjectType(); + Class<? extends PinotObject> pinotObjectClazz; + PinotObject pinotObject = null; + try { + switch (objectType.toUpperCase()) { + case "MAP": + pinotObjectClazz = MapObject.class; + break; + case "JSON": + pinotObjectClazz = JSONObject.class; + break; + case "TEXT": + pinotObjectClazz = TextObject.class; + break; + default: + // custom object type. + pinotObjectClazz = (Class<? extends PinotObject>) Class.forName(objectType); + } + pinotObject = pinotObjectClazz.getConstructor(new Class[]{}).newInstance(new Object[]{}); + } catch (Exception e) { + LOGGER.error("Error pinot object for type:{}. Skipping inverted index creation", objectType); + return; + } + + try (LuceneIndexCreator luceneIndexCreator = new LuceneIndexCreator(columnMetadata, invertedIndexDir)) { + try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) { + if (columnMetadata.isSingleValue()) { + // Single-value column. + VarByteChunkSingleValueReader svFwdIndex = (VarByteChunkSingleValueReader) fwdIndex; + for (int i = 0; i < numDocs; i++) { + byte[] bytes = svFwdIndex.getBytes(i); + + pinotObject.init(bytes); + luceneIndexCreator.add(pinotObject); + } + } else { + throw new UnsupportedOperationException("Multi Value not supported for complex object types"); + } + luceneIndexCreator.seal(); + } + } + String tarGzPath = TarGzCompressionUtils.createTarGzOfDirectory(invertedIndexDir.getAbsolutePath()); + + // For v3, write the generated inverted index file into the single file and remove it. + if (_segmentVersion == SegmentVersion.v3) { + LoaderUtils.writeIndexToV3Format(_segmentWriter, column, new File(tarGzPath), ColumnIndexType.INVERTED_INDEX); + } + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created inverted index for segment: {}, column: {}", _segmentName, column); + } + + private void createInvertedIndexForSimpleField(ColumnMetadata columnMetadata) throws IOException { String column = columnMetadata.getColumnName(); @@ -146,7 +242,11 @@ public class InvertedIndexHandler { int numRows = columnMetadata.getTotalDocs(); int numBitsPerValue = columnMetadata.getBitsPerElement(); if (columnMetadata.isSingleValue()) { - return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue); + if (columnMetadata.hasDictionary()) { + return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue); + } else { + return new VarByteChunkSingleValueReader(buffer); + } } else { return new FixedBitMultiValueReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), numBitsPerValue); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java index d68af16..d9d830a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BitmapInvertedIndexReader.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; + +import org.apache.pinot.core.common.Predicate; import org.apache.pinot.core.segment.memory.PinotDataBuffer; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.slf4j.Logger; @@ -52,6 +54,11 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR load(indexDataBuffer); } + @Override + public ImmutableRoaringBitmap getDocIds(Predicate predicate) { + throw new UnsupportedOperationException("Predicate based evaluation not supported for Bitmap based Indexing scheme"); + } + /** * {@inheritDoc} */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java index 6a949c0..6b42f4c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/InvertedIndexReader.java @@ -20,6 +20,8 @@ package org.apache.pinot.core.segment.index.readers; import java.io.Closeable; +import org.apache.pinot.core.common.Predicate; + public interface InvertedIndexReader<T> extends Closeable { @@ -27,4 +29,10 @@ public interface InvertedIndexReader<T> extends Closeable { * Get the document ids for the given dictionary id. */ T getDocIds(int dictId); + + /** + * Get the document id's for the given predicate + */ + T getDocIds(Predicate predicate); + } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java new file mode 100644 index 0000000..576bcc4 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneInvertedIndexReader.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.index.readers; + +import java.io.File; +import java.io.IOException; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.queryparser.flexible.core.nodes.FieldQueryNode; +import org.apache.lucene.queryparser.surround.query.FieldsQuery; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.pinot.common.data.FieldSpec.DataType; +import org.apache.pinot.common.data.FieldSpec.FieldType; +import org.apache.pinot.common.data.objects.TextObject; +import org.apache.pinot.core.common.Predicate; +import org.apache.pinot.core.common.predicate.MatchesPredicate; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.index.ColumnMetadata; +import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class LuceneInvertedIndexReader implements InvertedIndexReader<MutableRoaringBitmap> { + + private static final Logger LOGGER = LoggerFactory.getLogger(LuceneInvertedIndexReader.class); + private final IndexSearcher _searcher; + private final Analyzer _analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer()); + private Directory _index; + + /** + * TODO: Change this to take PinotDataBuffer, work around for now since Lucene needs actual + * directory + * @param metadata + * @param indexDir + * @param metadata + */ + public LuceneInvertedIndexReader(File segmentIndexDir, ColumnMetadata metadata) { + + try { + File searchIndexDir = new File(segmentIndexDir.getPath(), + metadata.getColumnName() + V1Constants.Indexes.LUCENE_INVERTED_INDEX_DIR); + _index = FSDirectory.open(searchIndexDir.toPath()); + IndexReader reader = DirectoryReader.open(_index); + _searcher = new IndexSearcher(reader); + } catch (IOException e) { + LOGGER.error("Encountered error creating LuceneSearchIndexReader ", e); + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + _index.close(); + } + + @Override + public MutableRoaringBitmap getDocIds(int dictId) { + throw new UnsupportedOperationException( + "DictId based evaluation not supported for Lucene based Indexing scheme"); + } + + @Override + public MutableRoaringBitmap getDocIds(Predicate predicate) { + MatchesPredicate matchesPredicate = (MatchesPredicate) predicate; + MutableRoaringBitmap bitmap = + getDocIds(matchesPredicate.getQuery(), matchesPredicate.getQueryOptions()); + return bitmap; + } + + public MutableRoaringBitmap getDocIds(String queryStr, String options) { + QueryParser queryParser = new QueryParser(TextObject.DEFAULT_FIELD, _analyzer); + Query query = null; + try { + query = queryParser.parse(queryStr); + } catch (ParseException e) { + LOGGER.error("Encountered exception while parsing query {}", queryStr, e); + throw new RuntimeException(e); + } + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + try { + Collector collector = createCollector(bitmap); + _searcher.search(query, collector); + } catch (IOException e) { + LOGGER.error("Encountered exception while executing search query {}", queryStr, e); + throw new RuntimeException(e); + } + return bitmap; + } + + private Collector createCollector(final MutableRoaringBitmap bitmap) { + return new LuceneResultCollector(bitmap); + } + + public static final class LuceneResultCollector implements Collector { + private final MutableRoaringBitmap bitmap; + + public LuceneResultCollector(MutableRoaringBitmap bitmap) { + this.bitmap = bitmap; + } + + @Override + public boolean needsScores() { + return false; + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + return new LeafCollector() { + + @Override + public void setScorer(Scorer scorer) throws IOException { + // ignore + } + + @Override + public void collect(int doc) throws IOException { + bitmap.add(doc); + } + }; + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java index 22ce9ad..9662d0e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/DocIdVirtualColumnProvider.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn; import java.io.IOException; import org.apache.pinot.common.data.FieldSpec; import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.common.utils.Pairs.IntPair; +import org.apache.pinot.core.common.Predicate; import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.io.reader.impl.ChunkReaderContext; @@ -108,7 +110,10 @@ public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider { public Pairs.IntPair getDocIds(int dictId) { return new Pairs.IntPair(dictId, dictId); } - + @Override + public IntPair getDocIds(Predicate predicate) { + throw new UnsupportedOperationException(""); + } @Override public void close() throws IOException { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java index 6082306..8672fed 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/SingleStringVirtualColumnProvider.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.segment.virtualcolumn; import java.io.IOException; import org.apache.pinot.common.data.FieldSpec; import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.common.utils.Pairs.IntPair; +import org.apache.pinot.core.common.Predicate; import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.io.reader.impl.v1.SortedIndexReader; @@ -75,7 +77,10 @@ public abstract class SingleStringVirtualColumnProvider extends BaseVirtualColum public SingleStringInvertedIndex(int length) { _length = length; } - + @Override + public IntPair getDocIds(Predicate predicate) { + throw new UnsupportedOperationException(""); + } @Override public Pairs.IntPair getDocIds(int dictId) { if (dictId == 0) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java new file mode 100644 index 0000000..ab4d561 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneIndexClusterIntegrationTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nonnull; + +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.data.DimensionFieldSpec; +import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.FieldSpec.DataType; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.indexsegment.generator.SegmentVersion; +import org.apache.pinot.tools.data.generator.AvroWriter; +import org.apache.pinot.tools.query.comparison.StarTreeQueryGenerator; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; + +public class LuceneIndexClusterIntegrationTest extends BaseClusterIntegrationTest { + protected static final String DEFAULT_TABLE_NAME = "myTable"; + static final long TOTAL_DOCS = 1_000L; + + protected Schema _schema; + private StarTreeQueryGenerator _queryGenerator; + private String _currentTable; + + @Nonnull + @Override + protected String getTableName() { + return _currentTable; + } + + @Nonnull + @Override + protected String getSchemaFileName() { + return null; + } + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServers(1); + + _schema = new Schema(); + FieldSpec jsonFieldSpec = new DimensionFieldSpec(); + jsonFieldSpec.setDataType(DataType.BYTES); + jsonFieldSpec.setDefaultNullValue("{}".getBytes()); + jsonFieldSpec.setObjectType("JSON"); + jsonFieldSpec.setName("headers"); + jsonFieldSpec.setSingleValueField(true); + _schema.addField(jsonFieldSpec); + + // Create the tables + ArrayList<String> invertedIndexColumns = Lists.newArrayList("headers"); + addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null, SegmentVersion.v1, + invertedIndexColumns, null, null); + + setUpSegmentsAndQueryGenerator(); + + // Wait for all documents loaded + _currentTable = DEFAULT_TABLE_NAME; + waitForAllDocsLoaded(10_000); + } + + @Override + protected long getCountStarResult() { + return TOTAL_DOCS; + } + + protected void setUpSegmentsAndQueryGenerator() throws Exception { + org.apache.avro.Schema avroSchema = AvroWriter.getAvroSchema(_schema); + DataFileWriter recordWriter = + new DataFileWriter<>(new GenericDatumWriter<GenericData.Record>(avroSchema)); + String parent = "/tmp/luceneTest"; + File avroFile = new File(parent, "part-" + 0 + ".avro"); + avroFile.getParentFile().mkdirs(); + recordWriter.create(avroSchema, avroFile); + ObjectMapper mapper = new ObjectMapper(); + for (int i = 0; i < TOTAL_DOCS; i++) { + ObjectNode objectNode = JsonNodeFactory.instance.objectNode(); + objectNode.put("k1", "value" + i); + objectNode.put("k2", "value" + i); + String json = mapper.writeValueAsString(objectNode); + GenericData.Record record = new GenericData.Record(avroSchema); + ByteBuffer byteBuffer = ByteBuffer.wrap(json.getBytes()); + record.put("headers", byteBuffer); + recordWriter.append(record); + } + recordWriter.close(); + + // Unpack the Avro files + List<File> avroFiles = Lists.newArrayList(avroFile); + + // Create and upload segments without star tree indexes from Avro data + createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false); + + } + + private void createAndUploadSegments(List<File> avroFiles, String tableName, + boolean createStarTreeIndex) throws Exception { + System.out.println("SEGMENT_DIR" + _segmentDir); + TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir); + + ExecutorService executor = Executors.newCachedThreadPool(); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName, + createStarTreeIndex, null, null, _schema, executor); + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.MINUTES); + + uploadSegments(_tarDir); + } + + @Test + public void testQueries() throws Exception { + String pqlQuery = + "Select count(*) from " + DEFAULT_TABLE_NAME + " WHERE headers matches('k1:\"value\\-1\"','')"; + JsonNode pinotResponse = postQuery(pqlQuery); + System.out.println(pinotResponse); + } + + @AfterClass + public void tearDown() throws Exception { + dropOfflineTable(DEFAULT_TABLE_NAME); + + stopServer(); + stopBroker(); + stopController(); + stopZk(); + + FileUtils.deleteDirectory(_tempDir); + } + +} diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml index 68393be..dd795c1 100644 --- a/pinot-perf/pom.xml +++ b/pinot-perf/pom.xml @@ -19,7 +19,8 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -101,6 +102,25 @@ </exclusions> </dependency> <dependency> + <groupId>net.sf.jopt-simple</groupId> + <artifactId>jopt-simple</artifactId> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>7.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-queryparser</artifactId> + <version>7.6.0</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers-common</artifactId> + <version>7.6.0</version> + </dependency> + <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>1.15</version> @@ -180,15 +200,20 @@ <binFileExtensions> <unix>.sh</unix> </binFileExtensions> - <!-- Set the target configuration directory to be used in the bin scripts --> + <!-- Set the target configuration directory to be used in the bin + scripts --> <configurationDirectory>conf</configurationDirectory> - <!-- Copy the contents from "/src/main/config" to the target configuration directory in the assembled application --> + <!-- Copy the contents from "/src/main/config" to the target configuration + directory in the assembled application --> <copyConfigurationDirectory>false</copyConfigurationDirectory> - <!-- Include the target configuration directory in the beginning of the classpath declaration in the bin scripts --> + <!-- Include the target configuration directory in the beginning + of the classpath declaration in the bin scripts --> <includeConfigurationDirectoryInClasspath>false</includeConfigurationDirectoryInClasspath> <assembleDirectory>${project.build.directory}/${project.artifactId}-pkg</assembleDirectory> <!-- Extra JVM arguments that will be included in the bin scripts --> - <extraJvmArguments>-Xms24G -Xmx24G -Dlog4j.configuration=log4j.properties</extraJvmArguments> + <extraJvmArguments>-Xms24G -Xmx24G + -Dlog4j.configuration=log4j.properties + </extraJvmArguments> <!-- Generate bin scripts for windows and unix pr default --> <platforms> <platform>unix</platform> diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java new file mode 100644 index 0000000..2af2ed5 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/LuceneBenchmark.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.WildcardQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.pinot.core.segment.index.readers.LuceneInvertedIndexReader; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + +public class LuceneBenchmark { + + public static void main(String[] args) throws IOException, ParseException { + Map<String, Analyzer> analyzerMap = new HashMap<>(); + Analyzer wrapper = new PerFieldAnalyzerWrapper(new StandardAnalyzer()); + Directory index = new RAMDirectory(); + IndexWriterConfig config = new IndexWriterConfig(wrapper); + + try (IndexWriter writer = new IndexWriter(index, config)) { + for (int i = 0; i < 1000; i++) { + Document doc = new Document(); + doc.add(new TextField("k1", "value-" + i, Field.Store.YES)); + doc.add(new TextField("k2", "value-" + i, Field.Store.YES)); + writer.addDocument(doc); + } + } + String querystr; + querystr = "k1:\"value1?0\""; + Query q = new QueryParser("Content", wrapper).parse(querystr); + q = new WildcardQuery(new Term("k1", QueryParser.escape("value1*"))); + + // 3. searching + IndexReader reader = DirectoryReader.open(index); + IndexSearcher searcher = new IndexSearcher(reader); + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + Collector collector = new LuceneInvertedIndexReader.LuceneResultCollector(bitmap); + searcher.search(q, collector); + + // 4. display results + System.out.println("Query string: " + querystr); + System.out.println("Found " + bitmap.getCardinality() + " hits."); + + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java index 838b6d1..c299257 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/ZookeeperLauncher.java @@ -36,7 +36,7 @@ public class ZookeeperLauncher { private ZkServer _zkServer; public ZookeeperLauncher() { - this("/tmp"); + this(org.apache.commons.io.FileUtils.getTempDirectoryPath()); } public ZookeeperLauncher(String baseTempDir) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org