[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214310465
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java ---
@@ -96,14 +96,35 @@ private static FileFooter3 
getFileFooter3(List infoList,
 return footer;
   }
 
-  public static BlockletIndex getBlockletIndex(
-  org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex 
info) {
+  public static 
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex
+  convertExternalMinMaxIndex(BlockletMinMaxIndex minMaxIndex) {
--- End diff --

please add a method comment to explain what is meaning of 
convertExternalMinMaxIndex


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214303472
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamDataMap {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier identifier;
--- End diff --

If carbonTable is getting stored then no need to store identifier...you can 
get it from carbontable


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214307411
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamDataMap {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier identifier;
+
+  private FilterExecuter filterExecuter;
+
+  public StreamDataMap(CarbonTable carbonTable) {
+this.carbonTable = carbonTable;
+this.identifier = carbonTable.getAbsoluteTableIdentifier();
+  }
+
+  public void init(FilterResolverIntf filterExp) {
+if (filterExp != null) {
+
+  List minMaxCacheColumns = new ArrayList<>();
+  for (CarbonDimension dimension : carbonTable.getDimensions()) {
+if (!dimension.isComplex()) {
+  minMaxCacheColumns.add(dimension);
+}
+  }
+  minMaxCacheColumns.addAll(carbonTable.getMeasures());
+
+  List listOfColumns =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  int[] columnCardinality = new int[listOfColumns.size()];
+  for (int index = 0; index < columnCardinality.length; index++) {
+columnCardinality[index] = Integer.MAX_VALUE;
+  }
+
+  SegmentProperties segmentProperties =
+  new SegmentProperties(listOfColumns, columnCardinality);
+
+  filterExecuter = FilterUtil.getFilterExecuterTree(
+  filterExp, segmentProperties, null, minMaxCacheColumns);
+}
+  }
+
+  public List prune(List segments) throws IOException 
{
+if (filterExecuter == null) {
+  return listAllStreamFiles(segments, false);
+} else {
+  List streamFileList = new ArrayList<>();
+  for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
+if (isScanRequire(streamFile)) {
+  streamFileList.add(streamFile);
+  streamFile.setMinMaxIndex(null);
+}
+  }
+  return streamFileList;
+}
+  }
+
+  private boolean isScanRequire(StreamFile streamFile) {
+// backward compatibility, old stream file without min/max index
+if (streamFile.getMinMaxIndex() == null) {
+  return true;
+}
+
+byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
+byte[][] minValue = 

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214313170
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 ---
@@ -205,8 +205,9 @@ class StreamHandoffRDD[K, V](
 segmentList.add(Segment.toSegment(handOffSegmentId, null))
 val splits = inputFormat.getSplitsOfStreaming(
   job,
-  
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier,
-  segmentList
+  segmentList,
+  carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+  null
--- End diff --

Once you add the overloaded method as explained in above comment you can 
call the method with 3 arguments from here


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214311953
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext 
job, CarbonTable carbonTabl
   /**
* use file list in .carbonindex file to get the split of streaming.
*/
-  public List getSplitsOfStreaming(JobContext job, 
AbsoluteTableIdentifier identifier,
-  List streamSegments) throws IOException {
+  public List getSplitsOfStreaming(JobContext job, 
List streamSegments,
+  CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) 
throws IOException {
 List splits = new ArrayList();
 if (streamSegments != null && !streamSegments.isEmpty()) {
   numStreamSegments = streamSegments.size();
   long minSize = Math.max(getFormatMinSplitSize(), 
getMinSplitSize(job));
   long maxSize = getMaxSplitSize(job);
-  for (Segment segment : streamSegments) {
-String segmentDir =
-CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo());
-FileFactory.FileType fileType = 
FileFactory.getFileType(segmentDir);
-if (FileFactory.isFileExist(segmentDir, fileType)) {
-  SegmentIndexFileStore segmentIndexFileStore = new 
SegmentIndexFileStore();
-  segmentIndexFileStore.readAllIIndexOfSegment(segmentDir);
-  Map carbonIndexMap = 
segmentIndexFileStore.getCarbonIndexMap();
-  CarbonIndexFileReader indexReader = new CarbonIndexFileReader();
-  for (byte[] fileData : carbonIndexMap.values()) {
-indexReader.openThriftReader(fileData);
-try {
-  // map block index
-  while (indexReader.hasNext()) {
-BlockIndex blockIndex = indexReader.readBlockIndexInfo();
-String filePath = segmentDir + File.separator + 
blockIndex.getFile_name();
-Path path = new Path(filePath);
-long length = blockIndex.getFile_size();
-if (length != 0) {
-  BlockLocation[] blkLocations;
-  FileSystem fs = FileFactory.getFileSystem(path);
-  FileStatus file = fs.getFileStatus(path);
-  blkLocations = fs.getFileBlockLocations(path, 0, length);
-  long blockSize = file.getBlockSize();
-  long splitSize = computeSplitSize(blockSize, minSize, 
maxSize);
-  long bytesRemaining = length;
-  while (((double) bytesRemaining) / splitSize > 1.1) {
-int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
-splits.add(makeSplit(segment.getSegmentNo(), path, 
length - bytesRemaining,
-splitSize, blkLocations[blkIndex].getHosts(),
-blkLocations[blkIndex].getCachedHosts(), 
FileFormat.ROW_V1));
-bytesRemaining -= splitSize;
-  }
-  if (bytesRemaining != 0) {
-int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
-splits.add(makeSplit(segment.getSegmentNo(), path, 
length - bytesRemaining,
-bytesRemaining, blkLocations[blkIndex].getHosts(),
-blkLocations[blkIndex].getCachedHosts(), 
FileFormat.ROW_V1));
-  }
-} else {
-  //Create empty hosts array for zero length files
-  splits.add(makeSplit(segment.getSegmentNo(), path, 0, 
length, new String[0],
-  FileFormat.ROW_V1));
-}
-  }
-} finally {
-  indexReader.closeThriftReader();
+
+  if (filterResolverIntf == null) {
+if (carbonTable != null) {
+  Expression filter = getFilterPredicates(job.getConfiguration());
+  if (filter != null) {
+carbonTable.processFilterExpression(filter, null, null);
+filterResolverIntf = carbonTable.resolveFilter(filter);
+  }
+}
+  }
+  StreamDataMap streamDataMap =
+  DataMapStoreManager.getInstance().getStreamDataMap(carbonTable);
+  streamDataMap.init(filterResolverIntf);
+  List streamFiles = streamDataMap.prune(streamSegments);
+  for (StreamFile streamFile : streamFiles) {
+if (FileFactory.isFileExist(streamFile.getFilePath())) {
+  Path path = new Path(streamFile.getFilePath());
+  long length = streamFile.getFileSize();
+

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214311329
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -342,60 +341,52 @@ public void refreshSegmentCacheIfRequired(JobContext 
job, CarbonTable carbonTabl
   /**
* use file list in .carbonindex file to get the split of streaming.
*/
-  public List getSplitsOfStreaming(JobContext job, 
AbsoluteTableIdentifier identifier,
-  List streamSegments) throws IOException {
+  public List getSplitsOfStreaming(JobContext job, 
List streamSegments,
+  CarbonTable carbonTable, FilterResolverIntf filterResolverIntf) 
throws IOException {
--- End diff --

You can write an overloaded method for getSplitsOfStreaming. One which 
accepts 3 parameters and one with 4 parameters.
1.  getSplitsOfStreaming(JobContext job, AbsoluteTableIdentifier 
identifier,List streamSegments)
-- From this method you can the other method and pass null as the 4th 
argument. This will avoid passing null at all places above.
2. getSplitsOfStreaming(JobContext job, List streamSegments, 
CarbonTable carbonTable, FilterResolverIntf filterResolverIntf)


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214305126
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamDataMap {
--- End diff --

Please check the feasibility if we can extend DataMap interface and 
implement all its method to keep it similar like BlockDataMap. I think it 
should be feasible


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-31 Thread manishgupta88
Github user manishgupta88 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r214316607
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
 ---
@@ -212,9 +271,13 @@ private void initializeAtFirstRow() throws 
IOException, InterruptedException {
 byte[] col = (byte[]) columnValue;
 output.writeShort(col.length);
 output.writeBytes(col);
+dimensionStatsCollectors[dimCount].update(col);
   } else {
 output.writeInt((int) columnValue);
+
dimensionStatsCollectors[dimCount].update(ByteUtil.toBytes((int) columnValue));
--- End diff --

For min/max comparison you are converting from Int to byte array for all 
the rows. This can impact the writing performance. Instead you can typecast 
into Int and do the comparison. After all the data is loaded then at the end 
you can convert all the values into byte array based on datatype. At that time 
it will be only one conversion for the final min/max values


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-29 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r213613888
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+@InterfaceAudience.Internal
+public class StreamDataMap {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier identifier;
+
+  private FilterExecuter filterExecuter;
+
+  public StreamDataMap(CarbonTable carbonTable) {
+this.carbonTable = carbonTable;
+this.identifier = carbonTable.getAbsoluteTableIdentifier();
+  }
+
+  public void init(FilterResolverIntf filterExp) {
+if (filterExp != null) {
+
+  List minMaxCacheColumns = new ArrayList<>();
+  for (CarbonDimension dimension : carbonTable.getDimensions()) {
+if (!dimension.isComplex()) {
+  minMaxCacheColumns.add(dimension);
+}
+  }
+  minMaxCacheColumns.addAll(carbonTable.getMeasures());
+
+  List listOfColumns =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  int[] columnCardinality = new int[listOfColumns.size()];
+  for (int index = 0; index < columnCardinality.length; index++) {
+columnCardinality[index] = Integer.MAX_VALUE;
+  }
+
+  SegmentProperties segmentProperties =
+  new SegmentProperties(listOfColumns, columnCardinality);
+
+  filterExecuter = FilterUtil.getFilterExecuterTree(
+  filterExp, segmentProperties, null, minMaxCacheColumns);
+}
+  }
+
+  public List prune(List segments) throws IOException 
{
+if (filterExecuter == null) {
+  return listAllStreamFiles(segments, false);
+} else {
+  List streamFileList = new ArrayList<>();
+  for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
+if (isScanRequire(streamFile)) {
+  streamFileList.add(streamFile);
+  streamFile.setMinMaxIndex(null);
+}
+  }
+  return streamFileList;
+}
+  }
+
+  private boolean isScanRequire(StreamFile streamFile) {
+// backward compatibility, old stream file without min/max index
+if (streamFile.getMinMaxIndex() == null) {
+  return true;
+}
+
+byte[][] maxValue = streamFile.getMinMaxIndex().getMaxValues();
+byte[][] minValue = 

[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211868981
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+public class StreamDataMap {
+
+  private CarbonTable carbonTable;
+
+  private AbsoluteTableIdentifier identifier;
+
+  private FilterExecuter filterExecuter;
+
+  public StreamDataMap(CarbonTable carbonTable) {
+this.carbonTable = carbonTable;
+this.identifier = carbonTable.getAbsoluteTableIdentifier();
+  }
+
+  public void init(FilterResolverIntf filterExp) {
+if (filterExp != null) {
+
+  List minMaxCacheColumns = new ArrayList<>();
+  for (CarbonDimension dimension : carbonTable.getDimensions()) {
+if (!dimension.isComplex()) {
+  minMaxCacheColumns.add(dimension);
+}
+  }
+  minMaxCacheColumns.addAll(carbonTable.getMeasures());
+
+  List listOfColumns =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  int[] columnCardinality = new int[listOfColumns.size()];
+  for (int index = 0; index < columnCardinality.length; index++) {
+columnCardinality[index] = Integer.MAX_VALUE;
+  }
+
+  SegmentProperties segmentProperties =
+  new SegmentProperties(listOfColumns, columnCardinality);
+
+  filterExecuter = FilterUtil.getFilterExecuterTree(
+  filterExp, segmentProperties, null, minMaxCacheColumns);
+}
+  }
+
+  public List prune(List segments) throws IOException 
{
+if (filterExecuter == null) {
+  return listAllStreamFiles(segments, false);
+} else {
+  List streamFileList = new ArrayList<>();
+  for (StreamFile streamFile : listAllStreamFiles(segments, true)) {
+if (hitStreamFile(streamFile)) {
+  streamFileList.add(streamFile);
+  streamFile.setMinMaxIndex(null);
+}
+  }
+  return streamFileList;
+}
+  }
+
+  private boolean hitStreamFile(StreamFile streamFile) {
--- End diff --

I think change the name to 'isScanRequired' is better.


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211854887
  
--- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java 
---
@@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int 
tgtOffset, byte[] srcBytes, int
 return flattenedData;
   }
 
+  /**
+   * perform XOR operation on the value, and convert it to byte array for 
sorting
--- End diff --

please add description for this method and the scenario


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211859543
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java 
---
@@ -334,6 +334,10 @@ public TableDataMap getDataMap(CarbonTable table, 
DataMapSchema dataMapSchema) {
 return dataMap;
   }
 
--- End diff --

please add a testcase to test the bigint datatype, for overflow scenario


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211855812
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/index/StreamFileIndex.java
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.index;
+
+import java.io.Serializable;
+
+import 
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class StreamFileIndex implements Serializable {
+
+  private String fileName;
--- End diff --

please describe the content of `fileName`, whether it includes the whole 
path


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211855246
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
 ---
@@ -171,9 +179,57 @@ private void initializeAtFirstRow() throws 
IOException, InterruptedException {
   writeFileHeader();
 }
 
+initializeStatsCollector();
+
 isFirstRow = false;
   }
 
+  private void initializeStatsCollector() {
+// initialize
--- End diff --

please explain why the length is isNoDictionaryDimensionColumn.length


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211853068
  
--- Diff: core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java 
---
@@ -669,4 +666,44 @@ public static int putBytes(byte[] tgtBytes, int 
tgtOffset, byte[] srcBytes, int
 return flattenedData;
   }
 
+  /**
+   * perform XOR operation on the value, and convert it to byte array for 
sorting
+   */
+  public static byte[] toXorBytes(short val) {
+val = (short)(val ^ Short.MIN_VALUE);
+return toBytes(val);
+  }
+
+  public static byte[] toXorBytes(int val) {
+val = val ^ Integer.MIN_VALUE;
+return toBytes(val);
+  }
+
+  public static byte[] toXorBytes(long val) {
+val = val ^ Long.MIN_VALUE;
+return toBytes(val);
+  }
+
+  public static byte[] toXorBytes(double val) {
+return toXorBytes(Double.doubleToLongBits(val));
+  }
+
+  /**
+   * convert byte array to the value, perform XOR operation on it to 
recover the real value
+   */
+  public static short toXorShort(byte[] bytes, int offset, final int 
length) {
+return (short)(toShort(bytes, offset, length) ^ Short.MIN_VALUE);
+  }
+
+  public static int toXorInt(byte[] bytes, int offset, final int length) {
--- End diff --

please add comment


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211852629
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamFile.java ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import 
org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+
+public class StreamFile {
--- End diff --

For all public class, please add interface annotation


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211852492
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datamap/StreamDataMap.java ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datamap;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import 
org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonIndexFileReader;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import 
org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.BlockIndex;
+
+public class StreamDataMap {
--- End diff --

add interface annotation


---


[GitHub] carbondata pull request #2644: [CARBONDATA-2853] Implement file-level min/ma...

2018-08-21 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2644#discussion_r211805649
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java ---
@@ -324,13 +324,13 @@ public static Object getDataBasedOnDataType(String 
data, DataType actualDataType
 if (actualDataType == DataTypes.BOOLEAN) {
   return ByteUtil.toBytes(BooleanConvert.parseBoolean(dimensionValue));
 } else if (actualDataType == DataTypes.SHORT) {
-  return ByteUtil.toBytes(Short.parseShort(dimensionValue));
+  return ByteUtil.toXorBytes(Short.parseShort(dimensionValue));
--- End diff --

Will this affect the legacy store?
It seems that a value will be encoded differently before and after this 
modification. If somewhere has used this method to encode and store data 
before, it will be a problem.


---