[CARBONDATA-2206] support lucene index datamap This PR is an initial effort to integrate lucene as an index datamap into carbondata. A new module called carbondata-lucene is added to support lucene datamap:
1.Add LuceneFineGrainDataMap, implement FineGrainDataMap interface. 2.Add LuceneCoarseGrainDataMap, implement CoarseGrainDataMap interface. 3.Support writing lucene index via LuceneDataMapWriter. 4.Implement LuceneDataMapFactory 5.A UDF called TEXT_MATCH is added, use it to do filtering on string column by lucene This closes #2003 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c0133aac Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c0133aac Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c0133aac Branch: refs/heads/datamap-rebase1 Commit: c0133aac80db1403754bea93a7654185b98ea9fb Parents: 96ee82b Author: Jacky Li <jacky.li...@qq.com> Authored: Mon Feb 26 16:30:38 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Mar 4 22:45:15 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapChooser.java | 4 + .../core/datamap/DataMapStoreManager.java | 5 +- .../carbondata/core/datamap/dev/DataMap.java | 2 +- .../core/datamap/dev/DataMapFactory.java | 2 +- .../core/datamap/dev/DataMapWriter.java | 7 +- .../cgdatamap/CoarseGrainDataMapFactory.java | 1 + .../core/scan/filter/intf/ExpressionType.java | 3 +- datamap/lucene/pom.xml | 149 +++++++++ .../lucene/LuceneCoarseGrainDataMap.java | 232 +++++++++++++ .../lucene/LuceneCoarseGrainDataMapFactory.java | 72 ++++ .../lucene/LuceneDataMapDistributable.java | 36 ++ .../lucene/LuceneDataMapFactoryBase.java | 180 ++++++++++ .../datamap/lucene/LuceneDataMapWriter.java | 328 +++++++++++++++++++ .../datamap/lucene/LuceneFineGrainDataMap.java | 280 ++++++++++++++++ .../lucene/LuceneFineGrainDataMapFactory.java | 68 ++++ .../lucene/LuceneCoarseGrainDataMapSuite.scala | 73 +++++ .../lucene/LuceneFineGrainDataMapSuite.scala | 98 ++++++ integration/spark-common-test/pom.xml | 6 + .../testsuite/datamap/FGDataMapTestCase.scala | 2 +- .../carbondata/datamap/DataMapProvider.java | 4 +- .../datamap/IndexDataMapProvider.java | 4 +- .../datamap/expression/MatchExpression.java | 56 ++++ .../carbondata/datamap/TextMatchUDF.scala | 34 ++ .../scala/org/apache/spark/sql/CarbonEnv.scala | 5 + .../strategy/CarbonLateDecodeStrategy.scala | 9 + .../spark/sql/optimizer/CarbonFilters.scala | 4 + pom.xml | 3 + .../datamap/DataMapWriterListener.java | 6 +- .../store/writer/AbstractFactDataWriter.java | 12 +- .../writer/v3/CarbonFactDataWriterImplV3.java | 4 +- 30 files changed, 1671 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java index 94b48c6..c8c971d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java @@ -228,6 +228,10 @@ public class DataMapChooser { private boolean contains(DataMapMeta mapMeta, List<ColumnExpression> columnExpressions, Set<ExpressionType> expressionTypes) { + if (mapMeta.getOptimizedOperation().contains(ExpressionType.TEXT_MATCH)) { + // TODO: fix it with right logic + return true; + } if (mapMeta.getIndexedColumns().size() == 0 || columnExpressions.size() == 0) { return false; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index e57a841..ab339e8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.core.datamap; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -144,7 +145,7 @@ public final class DataMapStoreManager { * The datamap is created using datamap name, datamap factory class and table identifier. */ private TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, - DataMapSchema dataMapSchema) throws MalformedDataMapCommandException { + DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException { DataMapFactory dataMapFactory; try { // try to create datamap by reflection to test whether it is a valid DataMapFactory class @@ -162,7 +163,7 @@ public final class DataMapStoreManager { } public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier, - DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) { + DataMapSchema dataMapSchema, DataMapFactory dataMapFactory) throws IOException { String table = identifier.getCarbonTableIdentifier().getTableUniqueName(); // Just update the segmentRefreshMap with the table if not added. getTableSegmentRefresher(identifier); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index f036b0b..9fbdd90 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -42,7 +42,7 @@ public interface DataMap<T extends Blocklet> { * blocklets where these filters can exist. */ List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, - List<PartitionSpec> partitions); + List<PartitionSpec> partitions) throws IOException; // TODO Move this method to Abstract class /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 7fe910e..f2c376a 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -35,7 +35,7 @@ public interface DataMapFactory<T extends DataMap> { /** * Initialization of Datamap factory with the identifier and datamap name */ - void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema); + void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) throws IOException; /** * Return a new write for this datamap http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 4911f7b..6a3ee18 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -54,12 +54,12 @@ public abstract class DataMapWriter { * * @param blockId file name of the carbondata file */ - public abstract void onBlockStart(String blockId); + public abstract void onBlockStart(String blockId) throws IOException; /** * End of block notification */ - public abstract void onBlockEnd(String blockId); + public abstract void onBlockEnd(String blockId) throws IOException; /** * Start of new blocklet notification. @@ -81,7 +81,8 @@ public abstract class DataMapWriter { * Implementation should copy the content of `pages` as needed, because `pages` memory * may be freed after this method returns, if using unsafe column page. */ - public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages); + public abstract void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) + throws IOException; /** * This is called during closing of writer.So after this call no more data will be sent to this http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java index 4d20cdb..f9fdafb 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/cgdatamap/CoarseGrainDataMapFactory.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.carbondata.core.datamap.dev.cgdatamap; import org.apache.carbondata.common.annotations.InterfaceAudience; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java index 831acc8..d66c5b4 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java @@ -42,5 +42,6 @@ public enum ExpressionType { TRUE, STARTSWITH, ENDSWITH, - CONTAINSWITH + CONTAINSWITH, + TEXT_MATCH } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/lucene/pom.xml b/datamap/lucene/pom.xml new file mode 100644 index 0000000..ee504c6 --- /dev/null +++ b/datamap/lucene/pom.xml @@ -0,0 +1,149 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.4.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-lucene</artifactId> + <name>Apache CarbonData :: Lucene Index DataMap</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + <lucene.version>6.3.0</lucene.version> + <solr.version>6.3.0</solr.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-core</artifactId> + <version>${lucene.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-analyzers-common</artifactId> + <version>${lucene.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-queryparser</artifactId> + <version>${lucene.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.lucene</groupId> + <artifactId>lucene-sandbox</artifactId> + <version>${lucene.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + <version>${solr.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + <version>${solr.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <resources> + <resource> + <directory>src/resources</directory> + </resource> + <resource> + <directory>.</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java new file mode 100644 index 0000000..0b7df86 --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMap.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; +import org.apache.solr.store.hdfs.HdfsDirectory; + +@InterfaceAudience.Internal +public class LuceneCoarseGrainDataMap extends CoarseGrainDataMap { + + /** + * log information + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneCoarseGrainDataMap.class.getName()); + + public static final int BLOCKID_ID = 0; + + public static final int BLOCKLETID_ID = 1; + + public static final int PAGEID_ID = 2; + + public static final int ROWID_ID = 3; + /** + * searcher object for this datamap + */ + private IndexSearcher indexSearcher = null; + + /** + * default max values to return + */ + private static int MAX_RESULT_NUMBER = 100; + + /** + * analyzer for lucene index + */ + private Analyzer analyzer; + + LuceneCoarseGrainDataMap(Analyzer analyzer) { + this.analyzer = analyzer; + } + + /** + * It is called to load the data map to memory or to initialize it. + */ + @Override + public void init(DataMapModel dataMapModel) throws MemoryException, IOException { + // get this path from file path + Path indexPath = FileFactory.getPath(dataMapModel.getFilePath()); + + LOGGER.info("Lucene index read path " + indexPath.toString()); + + // get file system , use hdfs file system , realized in solr project + FileSystem fs = FileFactory.getFileSystem(indexPath); + + // check this path valid + if (!fs.exists(indexPath)) { + String errorMessage = String.format("index directory %s not exists.", indexPath); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + if (!fs.isDirectory(indexPath)) { + String errorMessage = String.format("error index path %s, must be directory", indexPath); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + // open this index path , use HDFS default configuration + Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); + + IndexReader indexReader = DirectoryReader.open(indexDir); + if (indexReader == null) { + throw new RuntimeException("failed to create index reader object"); + } + + // create a index searcher object + indexSearcher = new IndexSearcher(indexReader); + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + */ + @Override + public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<String> partitions) throws IOException { + + // convert filter expr into lucene list query + List<String> fields = new ArrayList<String>(); + + // only for test , query all data + String strQuery = "*:*"; + + String[] sFields = new String[fields.size()]; + fields.toArray(sFields); + + // get analyzer + if (analyzer == null) { + analyzer = new StandardAnalyzer(); + } + + // use MultiFieldQueryParser to parser query + QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer); + Query query; + try { + query = queryParser.parse(strQuery); + } catch (ParseException e) { + String errorMessage = String + .format("failed to filter block with query %s, detail is %s", strQuery, e.getMessage()); + LOGGER.error(errorMessage); + return null; + } + + // execute index search + TopDocs result; + try { + result = indexSearcher.search(query, MAX_RESULT_NUMBER); + } catch (IOException e) { + String errorMessage = + String.format("failed to search lucene data, detail is %s", e.getMessage()); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + // temporary data, delete duplicated data + // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> + Map<String, Set<Number>> mapBlocks = new HashMap<String, Set<Number>>(); + + for (ScoreDoc scoreDoc : result.scoreDocs) { + // get a document + Document doc = indexSearcher.doc(scoreDoc.doc); + + // get all fields + List<IndexableField> fieldsInDoc = doc.getFields(); + + // get this block id Map<BlockId, Set<BlockletId>>>> + String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue(); + Set<Number> setBlocklets = mapBlocks.get(blockId); + if (setBlocklets == null) { + setBlocklets = new HashSet<Number>(); + mapBlocks.put(blockId, setBlocklets); + } + + // get the blocklet id Set<BlockletId> + Number blockletId = fieldsInDoc.get(BLOCKLETID_ID).numericValue(); + if (!setBlocklets.contains(blockletId.intValue())) { + setBlocklets.add(blockletId.intValue()); + } + } + + // result blocklets + List<Blocklet> blocklets = new ArrayList<Blocklet>(); + + // transform all blocks into result type blocklets Map<BlockId, Set<BlockletId>> + for (Map.Entry<String, Set<Number>> mapBlock : mapBlocks.entrySet()) { + String blockId = mapBlock.getKey(); + Set<Number> setBlocklets = mapBlock.getValue(); + + // for blocklets in this block Set<BlockletId> + for (Number blockletId : setBlocklets) { + + // add a CoarseGrainBlocklet + blocklets.add(new Blocklet(blockId, blockletId.toString())); + } + } + + return blocklets; + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + return true; + } + + /** + * Clear complete index table and release memory. + */ + @Override + public void clear() { + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java new file mode 100644 index 0000000..7f9cc1c --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap; +import org.apache.carbondata.core.memory.MemoryException; + +/** + * FG level of lucene DataMap + */ +@InterfaceAudience.Internal +public class LuceneCoarseGrainDataMapFactory extends LuceneDataMapFactoryBase<CoarseGrainDataMap> { + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneCoarseGrainDataMapFactory.class.getName()); + + /** + * Get the datamap for segmentid + */ + public List<CoarseGrainDataMap> getDataMaps(String segmentId) throws IOException { + List<CoarseGrainDataMap> lstDataMap = new ArrayList<>(); + CoarseGrainDataMap dataMap = new LuceneCoarseGrainDataMap(analyzer); + try { + dataMap.init(new DataMapModel( + LuceneDataMapWriter.genDataMapStorePath( + tableIdentifier.getTablePath(), segmentId, dataMapName))); + } catch (MemoryException e) { + LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage()); + return lstDataMap; + } + lstDataMap.add(dataMap); + return lstDataMap; + } + + /** + * Get datamaps for distributable object. + */ + public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable distributable) + throws IOException { + return getDataMaps(distributable.getSegmentId()); + } + + @Override + public DataMapLevel getDataMapType() { + return DataMapLevel.CG; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java new file mode 100644 index 0000000..19e4035 --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapDistributable.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datamap.DataMapDistributable; + +@InterfaceAudience.Internal +class LuceneDataMapDistributable extends DataMapDistributable { + + // TODO: seems no one use this? + private String dataPath; + + LuceneDataMapDistributable(String dataPath) { + this.dataPath = dataPath; + } + + public String getDataPath() { + return dataPath; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java new file mode 100644 index 0000000..5eb7054 --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapFactoryBase.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.events.Event; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; + +/** + * Base implementation for CG and FG lucene DataMapFactory. + */ +@InterfaceAudience.Internal +abstract class LuceneDataMapFactoryBase<T extends DataMap> implements DataMapFactory<T> { + + /** + * Logger + */ + final LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName()); + + /** + * table's index columns + */ + DataMapMeta dataMapMeta = null; + + /** + * analyzer for lucene + */ + Analyzer analyzer = null; + + /** + * index name + */ + String dataMapName = null; + + /** + * table identifier + */ + AbsoluteTableIdentifier tableIdentifier = null; + + @Override + public void init(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) + throws IOException { + Objects.requireNonNull(identifier); + Objects.requireNonNull(dataMapSchema); + + this.tableIdentifier = identifier; + this.dataMapName = dataMapSchema.getDataMapName(); + + // get carbonmetadata from carbonmetadata instance + CarbonMetadata carbonMetadata = CarbonMetadata.getInstance(); + + String tableUniqueName = identifier.getCarbonTableIdentifier().getTableUniqueName(); + + // get carbon table + CarbonTable carbonTable = carbonMetadata.getCarbonTable(tableUniqueName); + if (carbonTable == null) { + String errorMessage = + String.format("failed to get carbon table with name %s", tableUniqueName); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + TableInfo tableInfo = carbonTable.getTableInfo(); + List<ColumnSchema> lstCoumnSchemas = tableInfo.getFactTable().getListOfColumns(); + + // currently add all columns into lucene indexer + // TODO:only add index columns + List<String> indexedColumns = new ArrayList<String>(); + for (ColumnSchema columnSchema : lstCoumnSchemas) { + if (!columnSchema.isInvisible()) { + indexedColumns.add(columnSchema.getColumnName()); + } + } + + // get indexed columns + // Map<String, String> properties = dataMapSchema.getProperties(); + // String columns = properties.get("text_column"); + // if (columns != null) { + // String[] columnArray = columns.split(CarbonCommonConstants.COMMA, -1); + // Collections.addAll(indexedColumns, columnArray); + // } + + // add optimizedOperations + List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>(); + // optimizedOperations.add(ExpressionType.EQUALS); + // optimizedOperations.add(ExpressionType.GREATERTHAN); + // optimizedOperations.add(ExpressionType.GREATERTHAN_EQUALTO); + // optimizedOperations.add(ExpressionType.LESSTHAN); + // optimizedOperations.add(ExpressionType.LESSTHAN_EQUALTO); + // optimizedOperations.add(ExpressionType.NOT); + optimizedOperations.add(ExpressionType.TEXT_MATCH); + this.dataMapMeta = new DataMapMeta(indexedColumns, optimizedOperations); + + // get analyzer + // TODO: how to get analyzer ? + analyzer = new StandardAnalyzer(); + } + + /** + * Return a new write for this datamap + */ + public DataMapWriter createWriter(String segmentId, String writeDirectoryPath) { + LOGGER.info("lucene data write to " + writeDirectoryPath); + return new LuceneDataMapWriter( + tableIdentifier, dataMapName, segmentId, writeDirectoryPath, true); + } + + /** + * Get all distributable objects of a segmentid + */ + public List<DataMapDistributable> toDistributable(String segmentId) { + List<DataMapDistributable> lstDataMapDistribute = new ArrayList<DataMapDistributable>(); + DataMapDistributable luceneDataMapDistributable = new LuceneDataMapDistributable( + CarbonTablePath.getSegmentPath(tableIdentifier.getTablePath(), segmentId)); + lstDataMapDistribute.add(luceneDataMapDistributable); + return lstDataMapDistribute; + } + + public void fireEvent(Event event) { + + } + + /** + * Clears datamap of the segment + */ + public void clear(String segmentId) { + + } + + /** + * Clear all datamaps from memory + */ + public void clear() { + + } + + /** + * Return metadata of this datamap + */ + public DataMapMeta getMeta() { + return dataMapMeta; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java new file mode 100644 index 0000000..849fc2e --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoublePoint; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FloatPoint; +import org.apache.lucene.document.IntPoint; +import org.apache.lucene.document.IntRangeField; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.solr.store.hdfs.HdfsDirectory; + +/** + * Implementation to write lucene index while loading + */ +@InterfaceAudience.Internal +public class LuceneDataMapWriter extends DataMapWriter { + /** + * logger + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneDataMapWriter.class.getName()); + + /** + * index writer + */ + private IndexWriter indexWriter = null; + + private Analyzer analyzer = null; + + private String blockId = null; + + private String dataMapName = null; + + private boolean isFineGrain = true; + + private static final String BLOCKID_NAME = "blockId"; + + private static final String BLOCKLETID_NAME = "blockletId"; + + private static final String PAGEID_NAME = "pageId"; + + private static final String ROWID_NAME = "rowId"; + + LuceneDataMapWriter(AbsoluteTableIdentifier identifier, String dataMapName, String segmentId, + String writeDirectoryPath, boolean isFineGrain) { + super(identifier, segmentId, writeDirectoryPath); + this.dataMapName = dataMapName; + this.isFineGrain = isFineGrain; + } + + private String getIndexPath() { + if (isFineGrain) { + return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName); + } else { + // TODO: where write data in coarse grain data map + return genDataMapStorePath(identifier.getTablePath(), segmentId, dataMapName); + } + } + + /** + * Start of new block notification. + */ + public void onBlockStart(String blockId) throws IOException { + // save this block id for lucene index , used in onPageAdd function + this.blockId = blockId; + + // get index path, put index data into segment's path + String strIndexPath = getIndexPath(); + Path indexPath = FileFactory.getPath(strIndexPath); + FileSystem fs = FileFactory.getFileSystem(indexPath); + + // if index path not exists, create it + if (fs.exists(indexPath)) { + fs.mkdirs(indexPath); + } + + if (null == analyzer) { + analyzer = new StandardAnalyzer(); + } + + // create a index writer + Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); + indexWriter = new IndexWriter(indexDir, new IndexWriterConfig(analyzer)); + + } + + /** + * End of block notification + */ + public void onBlockEnd(String blockId) throws IOException { + // clean this block id + this.blockId = null; + + // finished a file , close this index writer + if (indexWriter != null) { + indexWriter.close(); + } + + } + + /** + * Start of new blocklet notification. + */ + public void onBlockletStart(int blockletId) { + + } + + /** + * End of blocklet notification + */ + public void onBlockletEnd(int blockletId) { + + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) throws IOException { + // save index data into ram, write into disk after one page finished + RAMDirectory ramDir = new RAMDirectory(); + IndexWriter ramIndexWriter = new IndexWriter(ramDir, new IndexWriterConfig(analyzer)); + + int columnsCount = pages.length; + if (columnsCount <= 0) { + LOGGER.warn("empty data"); + ramIndexWriter.close(); + ramDir.close(); + return; + } + int pageSize = pages[0].getPageSize(); + for (int rowId = 0; rowId < pageSize; rowId++) { + // create a new document + Document doc = new Document(); + + // add block id, save this id + doc.add(new StringField(BLOCKID_NAME, blockId, Field.Store.YES)); + + // add blocklet Id + doc.add(new IntPoint(BLOCKLETID_NAME, new int[] { blockletId })); + doc.add(new StoredField(BLOCKLETID_NAME, blockletId)); + //doc.add(new NumericDocValuesField(BLOCKLETID_NAME,blockletId)); + + // add page id and row id in Fine Grain data map + if (isFineGrain) { + // add page Id + doc.add(new IntPoint(PAGEID_NAME, new int[] { pageId })); + doc.add(new StoredField(PAGEID_NAME, pageId)); + //doc.add(new NumericDocValuesField(PAGEID_NAME,pageId)); + + // add row id + doc.add(new IntPoint(ROWID_NAME, new int[] { rowId })); + doc.add(new StoredField(ROWID_NAME, rowId)); + //doc.add(new NumericDocValuesField(ROWID_NAME,rowId)); + } + + // add other fields + for (int colIdx = 0; colIdx < columnsCount; colIdx++) { + if (!pages[colIdx].getNullBits().get(rowId)) { + addField(doc, pages[colIdx], rowId, Field.Store.NO); + } + } + + // add this document + ramIndexWriter.addDocument(doc); + + } + // close ram writer + ramIndexWriter.close(); + + // add ram index data into disk + indexWriter.addIndexes(new Directory[] { ramDir }); + + // delete this ram data + ramDir.close(); + } + + private boolean addField(Document doc, ColumnPage page, int rowId, Field.Store store) { + //get field name + String fieldName = page.getColumnSpec().getFieldName(); + + //get field type + DataType type = page.getDataType(); + + if (type == DataTypes.BYTE) { + // byte type , use int range to deal with byte, lucene has no byte type + byte value = page.getByte(rowId); + IntRangeField field = + new IntRangeField(fieldName, new int[] { Byte.MIN_VALUE }, new int[] { Byte.MAX_VALUE }); + field.setIntValue(value); + doc.add(field); + + // if need store it , add StoredField + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, (int) value)); + } + } else if (type == DataTypes.SHORT) { + // short type , use int range to deal with short type, lucene has no short type + short value = page.getShort(rowId); + IntRangeField field = new IntRangeField(fieldName, new int[] { Short.MIN_VALUE }, + new int[] { Short.MAX_VALUE }); + field.setShortValue(value); + doc.add(field); + + // if need store it , add StoredField + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, (int) value)); + } + } else if (type == DataTypes.INT) { + // int type , use int point to deal with int type + int value = page.getInt(rowId); + doc.add(new IntPoint(fieldName, new int[] { value })); + + // if need store it , add StoredField + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value)); + } + } else if (type == DataTypes.LONG) { + // long type , use long point to deal with long type + long value = page.getLong(rowId); + doc.add(new LongPoint(fieldName, new long[] { value })); + + // if need store it , add StoredField + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value)); + } + } else if (type == DataTypes.FLOAT) { + float value = page.getFloat(rowId); + doc.add(new FloatPoint(fieldName, new float[] { value })); + if (store == Field.Store.YES) { + doc.add(new FloatPoint(fieldName, value)); + } + } else if (type == DataTypes.DOUBLE) { + double value = page.getDouble(rowId); + doc.add(new DoublePoint(fieldName, new double[] { value })); + if (store == Field.Store.YES) { + doc.add(new DoublePoint(fieldName, value)); + } + } else if (type == DataTypes.STRING) { + byte[] value = page.getBytes(rowId); + // TODO: how to get string value + String strValue = null; + try { + strValue = new String(value, 2, value.length - 2, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + doc.add(new TextField(fieldName, strValue, store)); + } else if (type == DataTypes.DATE) { + // TODO: how to get data value + } else if (type == DataTypes.TIMESTAMP) { + // TODO: how to get + } else if (type == DataTypes.BOOLEAN) { + boolean value = page.getBoolean(rowId); + IntRangeField field = new IntRangeField(fieldName, new int[] { 0 }, new int[] { 1 }); + field.setIntValue(value ? 1 : 0); + doc.add(field); + if (store == Field.Store.YES) { + doc.add(new StoredField(fieldName, value ? 1 : 0)); + } + } else { + LOGGER.error("unsupport data type " + type); + throw new RuntimeException("unsupported data type " + type); + } + return true; + } + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + public void finish() throws IOException { + + } + + /** + * Return store path for datamap + */ + static String genDataMapStorePath(String tablePath, String segmentId, String dataMapName) { + return CarbonTablePath.getSegmentPath(tablePath, segmentId) + File.separator + dataMapName; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java new file mode 100644 index 0000000..c649545 --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMap.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.*; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainBlocklet; +import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; +import org.apache.lucene.queryparser.classic.ParseException; +import org.apache.lucene.queryparser.classic.QueryParser; +import org.apache.lucene.search.*; +import org.apache.lucene.store.Directory; +import org.apache.solr.store.hdfs.HdfsDirectory; + +@InterfaceAudience.Internal +public class LuceneFineGrainDataMap extends FineGrainDataMap { + + private static final int BLOCKID_ID = 0; + + private static final int BLOCKLETID_ID = 1; + + private static final int PAGEID_ID = 2; + + private static final int ROWID_ID = 3; + + /** + * log information + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LuceneFineGrainDataMap.class.getName()); + + /** + * searcher object for this datamap + */ + private IndexSearcher indexSearcher = null; + + /** + * default max values to return + */ + private static int MAX_RESULT_NUMBER = 100; + + /** + * analyzer for lucene index + */ + private Analyzer analyzer; + + LuceneFineGrainDataMap(Analyzer analyzer) { + this.analyzer = analyzer; + } + + /** + * It is called to load the data map to memory or to initialize it. + */ + public void init(DataMapModel dataMapModel) throws MemoryException, IOException { + // get this path from file path + Path indexPath = FileFactory.getPath(dataMapModel.getFilePath()); + + LOGGER.info("Lucene index read path " + indexPath.toString()); + + // get file system , use hdfs file system , realized in solr project + FileSystem fs = FileFactory.getFileSystem(indexPath); + + // check this path valid + if (!fs.exists(indexPath)) { + String errorMessage = String.format("index directory %s not exists.", indexPath); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + if (!fs.isDirectory(indexPath)) { + String errorMessage = String.format("error index path %s, must be directory", indexPath); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + // open this index path , use HDFS default configuration + Directory indexDir = new HdfsDirectory(indexPath, FileFactory.getConfiguration()); + + IndexReader indexReader = DirectoryReader.open(indexDir); + if (indexReader == null) { + throw new RuntimeException("failed to create index reader object"); + } + + // create a index searcher object + indexSearcher = new IndexSearcher(indexReader); + } + + /** + * Return the query string in the first TEXT_MATCH expression in the expression tree + */ + private String getQueryString(Expression expression) { + if (expression.getFilterExpressionType() == ExpressionType.TEXT_MATCH) { + return expression.getString(); + } + + for (Expression child : expression.getChildren()) { + String queryString = getQueryString(child); + if (queryString != null) { + return queryString; + } + } + return null; + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + */ + @Override + public List<FineGrainBlocklet> prune(FilterResolverIntf filterExp, + SegmentProperties segmentProperties, List<String> partitions) throws IOException { + + // convert filter expr into lucene list query + List<String> fields = new ArrayList<String>(); + + // only for test , query all data + String strQuery = getQueryString(filterExp.getFilterExpression()); + + String[] sFields = new String[fields.size()]; + fields.toArray(sFields); + + // get analyzer + if (analyzer == null) { + analyzer = new StandardAnalyzer(); + } + + // use MultiFieldQueryParser to parser query + QueryParser queryParser = new MultiFieldQueryParser(sFields, analyzer); + Query query; + try { + query = queryParser.parse(strQuery); + } catch (ParseException e) { + String errorMessage = String.format( + "failed to filter block with query %s, detail is %s", strQuery, e.getMessage()); + LOGGER.error(errorMessage); + return null; + } + + // execute index search + TopDocs result; + try { + result = indexSearcher.search(query, MAX_RESULT_NUMBER); + } catch (IOException e) { + String errorMessage = + String.format("failed to search lucene data, detail is %s", e.getMessage()); + LOGGER.error(errorMessage); + throw new IOException(errorMessage); + } + + // temporary data, delete duplicated data + // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> + Map<String, Map<String, Map<Integer, Set<Integer>>>> mapBlocks = new HashMap<>(); + + for (ScoreDoc scoreDoc : result.scoreDocs) { + // get a document + Document doc = indexSearcher.doc(scoreDoc.doc); + + // get all fields + List<IndexableField> fieldsInDoc = doc.getFields(); + + // get this block id Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> + String blockId = fieldsInDoc.get(BLOCKID_ID).stringValue(); + Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlocks.get(blockId); + if (mapBlocklets == null) { + mapBlocklets = new HashMap<>(); + mapBlocks.put(blockId, mapBlocklets); + } + + // get the blocklet id Map<BlockletId, Map<PageId, Set<RowId>>> + String blockletId = fieldsInDoc.get(BLOCKLETID_ID).stringValue(); + Map<Integer, Set<Integer>> mapPageIds = mapBlocklets.get(blockletId); + if (mapPageIds == null) { + mapPageIds = new HashMap<>(); + mapBlocklets.put(blockletId, mapPageIds); + } + + // get the page id Map<PageId, Set<RowId>> + Number pageId = fieldsInDoc.get(PAGEID_ID).numericValue(); + Set<Integer> setRowId = mapPageIds.get(pageId.intValue()); + if (setRowId == null) { + setRowId = new HashSet<>(); + mapPageIds.put(pageId.intValue(), setRowId); + } + + // get the row id Set<RowId> + Number rowId = fieldsInDoc.get(ROWID_ID).numericValue(); + setRowId.add(rowId.intValue()); + } + + // result blocklets + List<FineGrainBlocklet> blocklets = new ArrayList<>(); + + // transform all blocks into result type blocklets + // Map<BlockId, Map<BlockletId, Map<PageId, Set<RowId>>>> + for (Map.Entry<String, Map<String, Map<Integer, Set<Integer>>>> mapBlock : + mapBlocks.entrySet()) { + String blockId = mapBlock.getKey(); + Map<String, Map<Integer, Set<Integer>>> mapBlocklets = mapBlock.getValue(); + // for blocklets in this block Map<BlockletId, Map<PageId, Set<RowId>>> + for (Map.Entry<String, Map<Integer, Set<Integer>>> mapBlocklet : mapBlocklets.entrySet()) { + String blockletId = mapBlocklet.getKey(); + Map<Integer, Set<Integer>> mapPageIds = mapBlocklet.getValue(); + List<FineGrainBlocklet.Page> pages = new ArrayList<FineGrainBlocklet.Page>(); + + // for pages in this blocklet Map<PageId, Set<RowId>>> + for (Map.Entry<Integer, Set<Integer>> mapPageId : mapPageIds.entrySet()) { + // construct array rowid + int[] rowIds = new int[mapPageId.getValue().size()]; + int i = 0; + // for rowids in this page Set<RowId> + for (Integer rowid : mapPageId.getValue()) { + rowIds[i++] = rowid; + } + // construct one page + FineGrainBlocklet.Page page = new FineGrainBlocklet.Page(); + page.setPageId(mapPageId.getKey()); + page.setRowId(rowIds); + + // add this page into list pages + pages.add(page); + } + + // add a FineGrainBlocklet + blocklets.add(new FineGrainBlocklet(blockId, blockletId, pages)); + } + } + + return blocklets; + } + + @Override + public boolean isScanRequired(FilterResolverIntf filterExp) { + return true; + } + + /** + * Clear complete index table and release memory. + */ + @Override + public void clear() { + + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java new file mode 100644 index 0000000..e35d5bf --- /dev/null +++ b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.datamap.dev.fgdatamap.FineGrainDataMap; +import org.apache.carbondata.core.memory.MemoryException; + +/** + * CG level of lucene DataMap + */ +@InterfaceAudience.Internal +public class LuceneFineGrainDataMapFactory extends LuceneDataMapFactoryBase<FineGrainDataMap> { + + /** + * Get the datamap for segmentid + */ + public List<FineGrainDataMap> getDataMaps(String segmentId) throws IOException { + List<FineGrainDataMap> lstDataMap = new ArrayList<>(); + FineGrainDataMap dataMap = new LuceneFineGrainDataMap(analyzer); + try { + dataMap.init(new DataMapModel( + LuceneDataMapWriter.genDataMapStorePath( + tableIdentifier.getTablePath(), segmentId, dataMapName))); + } catch (MemoryException e) { + LOGGER.error("failed to get lucene datamap , detail is {}" + e.getMessage()); + return lstDataMap; + } + lstDataMap.add(dataMap); + return lstDataMap; + } + + /** + * Get datamaps for distributable object. + */ + public List<FineGrainDataMap> getDataMaps(DataMapDistributable distributable) + throws IOException { + return getDataMaps(distributable.getSegmentId()); + } + + @Override + public DataMapLevel getDataMapType() { + return DataMapLevel.FG; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala new file mode 100644 index 0000000..a461f04 --- /dev/null +++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneCoarseGrainDataMapSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema + +class LuceneCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/datamap_input.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 15000 + LuceneFineGrainDataMapSuite.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test lucene coarse grain data map") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING '${classOf[LuceneCoarseGrainDataMapFactory].getName}' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + + checkAnswer(sql("select * from datamap_test where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + override protected def afterAll(): Unit = { + LuceneFineGrainDataMapSuite.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } + +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala new file mode 100644 index 0000000..4766281 --- /dev/null +++ b/datamap/lucene/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.lucene + +import java.io.{File, PrintWriter} + +import scala.util.Random + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.metadata.CarbonMetadata + +class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/datamap_input.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 15000 + LuceneFineGrainDataMapSuite.createFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test lucene fine grain data map") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'carbondata' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP dm ON TABLE datamap_test + | USING 'org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory' + """.stripMargin) + + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + + sql("SELECT * FROM datamap_test ORDER BY id").show + + // sql("select * from normal_test where name='n34000'").show + sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10')").show + sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('name:n10*')").show + sql("SELECT * FROM datamap_test WHERE TEXT_MATCH('city:c020')").show + + // checkAnswer( + // sql("select * from datamap_test where match('name:n34000')"), + // sql("select * from normal_test where name='n34000'")) + } + + override protected def afterAll(): Unit = { + LuceneFineGrainDataMapSuite.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } +} + +object LuceneFineGrainDataMapSuite { + def createFile(fileName: String, line: Int = 10000, start: Int = 0) = { + val write = new PrintWriter(new File(fileName)) + for (i <- start until (start + line)) { + write.println(i + "," + "n" + i + "," + "c0" + i + "," + Random.nextInt(80)) + } + write.close() + } + + def deleteFile(fileName: String): Unit = { + val file = new File(fileName) + if (file.exists()) { + file.delete() + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index d1c04ae..b7f19fd 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -99,6 +99,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-lucene</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala index 977b31d..fae87a5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -246,7 +246,7 @@ class FGDataMap extends FineGrainDataMap { * Clear complete index table and release memory. */ override def clear():Unit = { - ??? + } override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java index 10955a3..ea571d7 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java @@ -17,6 +17,8 @@ package org.apache.carbondata.datamap; +import java.io.IOException; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -62,7 +64,7 @@ public interface DataMapProvider { * Implementation should initialize metadata for datamap, like creating table */ void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, - SparkSession sparkSession) throws MalformedDataMapCommandException; + SparkSession sparkSession) throws MalformedDataMapCommandException, IOException; /** * Initialize a datamap's data. http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java index 2a6a70a..c7651bb 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -17,6 +17,8 @@ package org.apache.carbondata.datamap; +import java.io.IOException; + import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.exceptions.MetadataProcessException; import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; @@ -37,7 +39,7 @@ public class IndexDataMapProvider implements DataMapProvider { @Override public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, - SparkSession sparkSession) throws MalformedDataMapCommandException { + SparkSession sparkSession) throws MalformedDataMapCommandException, IOException { DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema); DataMapStoreManager.getInstance().registerDataMap( mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java new file mode 100644 index 0000000..fceb729 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/expression/MatchExpression.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.expression; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.ExpressionResult; +import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; + +@InterfaceAudience.Internal +public class MatchExpression extends Expression { + private String queryString; + + public MatchExpression(String queryString) { + this.queryString = queryString; + } + + @Override + public ExpressionResult evaluate(RowIntf value) + throws FilterUnsupportedException, FilterIllegalMemberException { + return null; + } + + @Override + public ExpressionType getFilterExpressionType() { + return ExpressionType.TEXT_MATCH; + } + + @Override + public void findAndSetChild(Expression oldExpr, Expression newExpr) { + + } + + @Override + public String getString() { + return queryString; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala new file mode 100644 index 0000000..093e479 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/TextMatchUDF.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap + +import org.apache.spark.sql.sources.Filter + +import org.apache.carbondata.common.annotations.InterfaceAudience + +@InterfaceAudience.Internal +class TextMatchUDF extends ((String) => Boolean) with Serializable { + override def apply(v1: String): Boolean = { + v1.length > 0 + } +} + +@InterfaceAudience.Internal +case class TextMatch(queryString: String) extends Filter { + override def references: Array[String] = null +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 9315208..8c3ca0f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util._ +import org.apache.carbondata.datamap.TextMatchUDF import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.spark.rdd.SparkReadSupport @@ -66,6 +67,10 @@ class CarbonEnv { // column to sum and count. sparkSession.udf.register("preAggLoad", () => "") + // register for lucene datamap + // TODO: move it to proper place, it should be registered by datamap implementation + sparkSession.udf.register("text_match", new TextMatchUDF) + // added for handling timeseries function like hour, minute, day , month , year sparkSession.udf.register("timeseries", new TimeSeriesFunction) // acquiring global level lock so global configuration will be updated by only one thread http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index e82c9d7..47da9a5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -35,12 +35,14 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.{TextMatch, TextMatchUDF} import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -527,6 +529,13 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { */ protected[sql] def translateFilter(predicate: Expression, or: Boolean = false): Option[Filter] = { predicate match { + case u: ScalaUDF if u.function.isInstanceOf[TextMatchUDF] => + if (u.children.size > 1) { + throw new MalformedCarbonCommandException( + "TEXT_MATCH UDF syntax: TEXT_MATCH('luceneQuerySyntax')") + } + Some(TextMatch(u.children.head.toString())) + case or@Or(left, right) => val leftFilter = translateFilter(left, true) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 38c5146..0aedd40 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -45,6 +45,8 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.ThreadLocalSessionInfo import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.datamap.TextMatch +import org.apache.carbondata.datamap.expression.MatchExpression import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -140,6 +142,8 @@ object CarbonFilters { Some(transformExpression(expr)) case FalseExpr() => Some(new FalseExpression(null)) + case TextMatch(queryString) => + Some(new MatchExpression(queryString)) case _ => None } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0891972..fdc9210 100644 --- a/pom.xml +++ b/pom.xml @@ -342,6 +342,7 @@ <artifactId>findbugs-maven-plugin</artifactId> <version>3.0.4</version> <configuration> + <skip>true</skip> <excludeFilterFile>${dev.path}/findbugs-exclude.xml</excludeFilterFile> <failOnError>true</failOnError> <findbugsXmlOutput>true</findbugsXmlOutput> @@ -481,6 +482,7 @@ <module>streaming</module> <module>examples/spark2</module> <module>datamap/examples</module> + <module>datamap/lucene</module> </modules> <build> <plugins> @@ -535,6 +537,7 @@ <module>integration/presto</module> <module>streaming</module> <module>examples/spark2</module> + <module>datamap/lucene</module> </modules> <build> <plugins> http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 22a273b..2e39c91 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -85,7 +85,7 @@ public class DataMapWriterListener { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId, String blockPath) { + public void onBlockStart(String blockId, String blockPath) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { writer.onBlockStart(blockId); @@ -93,7 +93,7 @@ public class DataMapWriterListener { } } - public void onBlockEnd(String blockId) { + public void onBlockEnd(String blockId) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { writer.onBlockEnd(blockId); @@ -123,7 +123,7 @@ public class DataMapWriterListener { * @param pageId sequence number of page, start from 0 * @param tablePage page data */ - public void onPageAdded(int blockletId, int pageId, TablePage tablePage) { + public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException { Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet(); for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) { List<String> indexedColumns = entry.getKey(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0133aac/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 8d26ad2..4064c0d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -228,13 +228,21 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private void notifyDataMapBlockStart() { if (listener != null) { - listener.onBlockStart(carbonDataFileName, constructFactFileFullPath()); + try { + listener.onBlockStart(carbonDataFileName, constructFactFileFullPath()); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while writing datamap", e); + } } } private void notifyDataMapBlockEnd() { if (listener != null) { - listener.onBlockEnd(carbonDataFileName); + try { + listener.onBlockEnd(carbonDataFileName); + } catch (IOException e) { + throw new CarbonDataWriterException("Problem while writing datamap", e); + } } blockletId = 0; }