[CARBONDATA-2454][DataMap] Add fpp property for bloom datamap add fpp(false positive probability) property to configure bloom filter that used by bloom datamap.
This closes #2279 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b949716 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b949716 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b949716 Branch: refs/heads/spark-2.3 Commit: 6b949716bc3263dd3c3c80ba5097b3885f0af208 Parents: 2c0fa10 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Tue May 8 12:00:39 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed May 9 10:23:58 2018 +0800 ---------------------------------------------------------------------- .../bloom/BloomCoarseGrainDataMapFactory.java | 46 +++++++++++++++++++- .../datamap/bloom/BloomDataMapRefresher.java | 6 ++- .../datamap/bloom/BloomDataMapWriter.java | 7 ++- 3 files changed, 53 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b949716/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 2d43c40..4e62526 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -65,9 +65,18 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa * and all the indexed value is distinct. */ private static final int DEFAULT_BLOOM_FILTER_SIZE = 32000 * 20; + /** + * property for fpp(false-positive-probability) of bloom filter + */ + private static final String BLOOM_FPP = "bloom_fpp"; + /** + * default value for fpp of bloom filter + */ + private static final double DEFAULT_BLOOM_FILTER_FPP = 0.00001d; private DataMapMeta dataMapMeta; private String dataMapName; private int bloomFilterSize; + private double bloomFilterFpp; public BloomCoarseGrainDataMapFactory(CarbonTable carbonTable, DataMapSchema dataMapSchema) throws MalformedDataMapCommandException { @@ -79,6 +88,7 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa List<CarbonColumn> indexedColumns = carbonTable.getIndexedColumns(dataMapSchema); this.bloomFilterSize = validateAndGetBloomFilterSize(dataMapSchema); + this.bloomFilterFpp = validateAndGetBloomFilterFpp(dataMapSchema); List<ExpressionType> optimizedOperations = new ArrayList<ExpressionType>(); // todo: support more optimize operations optimizedOperations.add(ExpressionType.EQUALS); @@ -118,19 +128,51 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa return bloomFilterSize; } + /** + * validate bloom DataMap BLOOM_FPP + * 1. BLOOM_FPP property is optional, 0.00001 will be the default value. + * 2. BLOOM_FPP should be (0, 1) + */ + private double validateAndGetBloomFilterFpp(DataMapSchema dmSchema) + throws MalformedDataMapCommandException { + String bloomFilterFppStr = dmSchema.getProperties().get(BLOOM_FPP); + if (StringUtils.isBlank(bloomFilterFppStr)) { + LOGGER.warn( + String.format("Bloom filter FPP is not configured for datamap %s, use default value %f", + dataMapName, DEFAULT_BLOOM_FILTER_FPP)); + return DEFAULT_BLOOM_FILTER_FPP; + } + double bloomFilterFpp; + try { + bloomFilterFpp = Double.parseDouble(bloomFilterFppStr); + } catch (NumberFormatException e) { + throw new MalformedDataMapCommandException( + String.format("Invalid value of bloom filter fpp '%s', it should be an numeric", + bloomFilterFppStr)); + } + if (bloomFilterFpp < 0 || bloomFilterFpp - 1 >= 0) { + throw new MalformedDataMapCommandException( + String.format("Invalid value of bloom filter fpp '%s', it should be in range 0~1", + bloomFilterFppStr)); + } + return bloomFilterFpp; + } + @Override public DataMapWriter createWriter(Segment segment, String shardName) throws IOException { LOGGER.info( String.format("Data of BloomCoarseGranDataMap %s for table %s will be written to %s", this.dataMapName, getCarbonTable().getTableName() , shardName)); return new BloomDataMapWriter(getCarbonTable().getTablePath(), this.dataMapName, - this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize); + this.dataMapMeta.getIndexedColumns(), segment, shardName, + this.bloomFilterSize, this.bloomFilterFpp); } @Override public DataMapRefresher createRefresher(Segment segment, String shardName) throws IOException { return new BloomDataMapRefresher(getCarbonTable().getTablePath(), this.dataMapName, - this.dataMapMeta.getIndexedColumns(), segment, shardName, this.bloomFilterSize); + this.dataMapMeta.getIndexedColumns(), segment, shardName, + this.bloomFilterSize, this.bloomFilterFpp); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b949716/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java index cb86c39..8e05133 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapRefresher.java @@ -36,8 +36,10 @@ import org.apache.carbondata.core.util.CarbonUtil; public class BloomDataMapRefresher extends BloomDataMapWriter implements DataMapRefresher { BloomDataMapRefresher(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, - Segment segment, String shardName, int bloomFilterSize) throws IOException { - super(tablePath, dataMapName, indexColumns, segment, shardName, bloomFilterSize); + Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp) + throws IOException { + super(tablePath, dataMapName, indexColumns, segment, shardName, + bloomFilterSize, bloomFilterFpp); } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b949716/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java index 4e07182..a55de11 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java @@ -50,6 +50,7 @@ public class BloomDataMapWriter extends DataMapWriter { private static final LogService LOG = LogServiceFactory.getLogService( BloomDataMapWriter.class.getCanonicalName()); private int bloomFilterSize; + private double bloomFilterFpp; protected int currentBlockletId; private List<String> currentDMFiles; private List<DataOutputStream> currentDataOutStreams; @@ -57,9 +58,11 @@ public class BloomDataMapWriter extends DataMapWriter { protected List<BloomFilter<byte[]>> indexBloomFilters; BloomDataMapWriter(String tablePath, String dataMapName, List<CarbonColumn> indexColumns, - Segment segment, String shardName, int bloomFilterSize) throws IOException { + Segment segment, String shardName, int bloomFilterSize, double bloomFilterFpp) + throws IOException { super(tablePath, dataMapName, indexColumns, segment, shardName); this.bloomFilterSize = bloomFilterSize; + this.bloomFilterFpp = bloomFilterFpp; currentDMFiles = new ArrayList<String>(indexColumns.size()); currentDataOutStreams = new ArrayList<DataOutputStream>(indexColumns.size()); @@ -86,7 +89,7 @@ public class BloomDataMapWriter extends DataMapWriter { List<CarbonColumn> indexColumns = getIndexColumns(); for (int i = 0; i < indexColumns.size(); i++) { indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(), - bloomFilterSize, 0.00001d)); + bloomFilterSize, bloomFilterFpp)); } }