[GitHub] carbondata pull request #1736: [CARBONDATA-1904][CARBONDATA-1905] Support au...

2017-12-29 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1736#discussion_r159039529
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 ---
@@ -80,6 +80,12 @@ class CarbonAppendableStreamSink(
 CarbonProperties.getInstance().getHandoffSize
   )
 
+  // auto handoff
+  private val enableAutoHandoff = hadoopConf.getBoolean(
+CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
--- End diff --

Write the name in doc.


---


[GitHub] carbondata pull request #1736: [CARBONDATA-1904][CARBONDATA-1905] Support au...

2017-12-28 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1736#discussion_r159030981
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 ---
@@ -1459,6 +1459,13 @@
*/
   public static final String HANDOFF_SIZE = 
"carbon.streaming.segment.max.size";
 
+  /**
+   * enable auto handoff streaming segment
+   */
+  public static final String ENABLE_AUTO_HANDOFF = 
"carbon.enable.auto.handoff";
--- End diff --

I think `carbon.streaming.auto.handoff.enabled` is better.


---


[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...

2017-12-17 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1665#discussion_r157410277
  
--- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---
@@ -280,25 +280,26 @@ class CarbonOptimizer(
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) 
extends
-  SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, 
sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateHiveTable(ctx: CreateHiveTableContext): 
LogicalPlan = {
 val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
 if (fileStorage.equalsIgnoreCase("'carbondata'") ||
 fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
   helper.createCarbonTable(ctx.createTableHeader,
-  ctx.skewSpec,
--- End diff --

@jackylk We should use the right format at the first time, I mean use 2 
blanks instead of 4 blanks. We should avoid unrelated modify, so we can keep 
each MR with less diff. Better to review and better to rebase.


---


[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...

2017-12-17 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1665#discussion_r157409856
  
--- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---
@@ -259,25 +259,26 @@ object CarbonOptimizerUtil {
   }
 }
 
-class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) 
extends
-  SparkSqlAstBuilder(conf) {
+class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, 
sparkSession: SparkSession)
+  extends SparkSqlAstBuilder(conf) {
 
-  val helper = new CarbonHelperSqlAstBuilder(conf, parser)
+  val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession)
 
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
 val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
 if (fileStorage.equalsIgnoreCase("'carbondata'") ||
 fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
   helper.createCarbonTable(ctx.createTableHeader,
--- End diff --

move `ctx.createTableHeader` to next line


---


[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...

2017-12-17 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1665#discussion_r157408233
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.table
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.MetadataCommand
+import 
org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param query the query whose result will be insert into the new relation
+ * @param tableInfo the Table Describe, which may contains serde, storage 
handler etc.
+ * @param ifNotExistsSet allow continue working if it's already exists, 
otherwise
+ *  raise exception
+ * @param tableLocation store location where the table need to be created
+ */
+case class CarbonCreateTableAsSelectCommand(query: LogicalPlan,
+tableInfo: TableInfo,
+ifNotExistsSet: Boolean = false,
+tableLocation: Option[String] = None) extends MetadataCommand {
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+val tableName = tableInfo.getFactTable.getTableName
+var databaseOpt: Option[String] = None
+if (tableInfo.getDatabaseName != null) {
+  databaseOpt = Some(tableInfo.getDatabaseName)
+}
+val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+LOGGER.audit(s"Request received for CTAS for $dbName.$tableName")
+lazy val carbonDataSourceHadoopRelation = {
--- End diff --

Why you use lazy here?


---


[GitHub] carbondata pull request #863: [CARBONDATA-998] Don't request executors when ...

2017-12-14 Thread watermen
Github user watermen closed the pull request at:

https://github.com/apache/carbondata/pull/863


---


[GitHub] carbondata pull request #606: [CARBONDATA-713] Make the store path in right ...

2017-12-14 Thread watermen
Github user watermen closed the pull request at:

https://github.com/apache/carbondata/pull/606


---


[GitHub] carbondata issue #1245: [CARBONDATA-1366]Change rdd storage level to 'MEMORY...

2017-08-08 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/1245
  
@jackylk Agreed with @xuchuanyin, Spark’s storage levels are meant to 
provide different trade-offs between memory usage and CPU efficiency. So 
different environment correspond to different storage level. So here we'd 
better make a conf named 'storage_level', and the default value of it is 
MEMORY_ONLY(the same to default value in spark). We can get more info about 
storage level in spark 
website.(http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #1223: [WIP] Support cleaning garbage segment in all tables

2017-08-02 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/1223
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #1037: [CARBONDATA-1163][Bug-Fix] Rollback the code ...

2017-06-15 Thread watermen
GitHub user watermen opened a pull request:

https://github.com/apache/carbondata/pull/1037

[CARBONDATA-1163][Bug-Fix] Rollback the code because it can't reuse the 
convertRow



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/watermen/incubator-carbondata load-fixbug2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/1037.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1037


commit 36923c2f782b4401bf45b9fc7dfcffeaec44befa
Author: Yadong Qi <qiyadong2...@gmail.com>
Date:   2017-06-15T07:16:28Z

Don't reuse the converRow.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #972: [CARBONDATA-1065] Added set command in carbon to upda...

2017-06-14 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/972
  
@mohammadshahidkhan But we'd better to follow the habit of users who is 
familiar with spark/hive/hdfs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #972: [CARBONDATA-1065] Added set command in carbon to upda...

2017-06-14 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/972
  
I think we'd better to use `SET carbon.load.sort.scope = LOCAL_SORT` 
insteads of `SET SORT_SCOPE = LOCAL_SORT`, @mohammadshahidkhan can you explain 
the reason? cc @jackylk 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #1000: [CARBONDATA-1018] Add unsafe ColumnPage implementati...

2017-06-14 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/1000
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...

2017-06-14 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1000#discussion_r121876896
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---
@@ -323,15 +340,8 @@ public double getDouble(int rowId) {
 return doubleData[rowId];
   }
 
-  /**
-   * Get decimal value at rowId
-   */
-  public byte[] getDecimalBytes(int rowId) {
-return byteArrayData[rowId];
-  }
-
   public BigDecimal getDecimal(int rowId) {
--- End diff --

Add annotation like getDouble


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...

2017-06-14 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1000#discussion_r121875823
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---
@@ -508,7 +567,7 @@ public static ColumnPage decompress(Compressor 
compressor, DataType dataType,
   }
 
   // input byte[] is LV encoded, this function can expand it into byte[][]
-  private static byte[][] deflatten(byte[] input) {
+  protected static byte[][] deflatten(byte[] input) {
--- End diff --

Agree with erlu.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...

2017-06-14 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1000#discussion_r121874665
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java ---
@@ -98,56 +117,54 @@ public static ColumnPage newPage(DataType dataType, 
int pageSize) {
   default:
 throw new RuntimeException("Unsupported data dataType: " + 
dataType);
 }
-instance.stats = new ColumnPageStatsVO(dataType);
-instance.nullBitSet = new BitSet(pageSize);
 return instance;
   }
 
   protected static ColumnPage newBytePage(byte[] byteData) {
 ColumnPage columnPage = new ColumnPage(BYTE, byteData.length);
--- End diff --

If don't support, we'd better to add some notes to explain.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-12 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121315052
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortHelper.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.sort;
+
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
+
+public class SortHelper {
--- End diff --

In `SortTempFileChunkHolder.getRowFromStream`, data is from stream, so we 
can't reuse the function in `SortStepRowUtil`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-11 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121306203
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.util.Comparator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.csvload.{CSVInputFormat, 
StringArrayWritable}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.storage.StorageLevel
+
+/**
+  * Use sortBy operator in spark to load the data
+  */
+object GlobalSort {
--- End diff --

Name it to `DataLoadProcess`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-11 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121305832
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.csvload.StringArrayWritable
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import 
org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl
+import org.apache.carbondata.processing.newflow.sort.SortHelper
+import 
org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, 
DataWriterProcessorStepImpl}
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters
+import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.Row
+import org.apache.spark.util.LongAccumulator
+import org.apache.spark.{SparkEnv, TaskContext}
+
+import scala.util.Random
+
+object GlobalSortOperates {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def toStringArrayRow(row: StringArrayWritable, columnCount: Int): 
StringArrayRow = {
+val outRow = new StringArrayRow(new Array[String](columnCount))
+outRow.setValues(row.get())
+  }
+
+  def toRDDIterator(
+  rows: Iterator[Row],
+  modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] 
= {
+new Iterator[Array[AnyRef]] {
+  val iter = new NewRddIterator(rows, modelBroadcast.value, 
TaskContext.get())
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Array[AnyRef] = iter.next
+}
+  }
+
+  def inputFunc(
+  rows: Iterator[Array[AnyRef]],
+  index: Int,
+  currentLoadCount: Int,
+  modelBroadcast: Broadcast[CarbonLoadModel],
+  rowNumber: LongAccumulator): Iterator[CarbonRow] = {
+val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, 
modelBroadcast)
+val conf = DataLoadProcessBuilder.createConfiguration(model)
+val rowParser = new RowParserImpl(conf.getDataFields, conf)
+
+TaskContext.get().addTaskFailureListener { (t: TaskContext, e: 
Throwable) =>
+  wrapException(e, model)
+}
+
+new Iterator[CarbonRow] {
+  override def hasNext: Boolean = rows.hasNext
+
+  override def next(): CarbonRow = {
+rowNumber.add(1)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-11 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121305557
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.csvload.StringArrayWritable
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import 
org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl
+import org.apache.carbondata.processing.newflow.sort.SortHelper
+import 
org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, 
DataWriterProcessorStepImpl}
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters
+import org.apache.carbondata.processing.store.{CarbonFactHandler, 
CarbonFactHandlerFactory}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.Row
+import org.apache.spark.util.LongAccumulator
+import org.apache.spark.{SparkEnv, TaskContext}
+
+import scala.util.Random
+
+object GlobalSortOperates {
--- End diff --

Name it to `DataLoadProcessorStep`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-11 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121305190
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import java.util.Comparator
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.csvload.{CSVInputFormat, 
StringArrayWritable}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder
+import 
org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.rdd.NewHadoopRDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.storage.StorageLevel
+
+/**
+  * Use sortBy operator in spark to load the data
+  */
+object GlobalSort {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def loadDataUsingGlobalSort(
+  sc: SparkContext,
+  dataFrame: Option[DataFrame],
+  model: CarbonLoadModel,
+  currentLoadCount: Int): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+val originRDD = if (dataFrame.isDefined) {
+  dataFrame.get.rdd
+} else {
+  // input data from files
+  val hadoopConfiguration = new Configuration()
+  CommonUtil.configureCSVInputFormat(hadoopConfiguration, model)
+  hadoopConfiguration.set(FileInputFormat.INPUT_DIR, 
model.getFactFilePath)
+  val columnCount = model.getCsvHeaderColumns.length
+  new NewHadoopRDD[NullWritable, StringArrayWritable](
+sc,
+classOf[CSVInputFormat],
+classOf[NullWritable],
+classOf[StringArrayWritable],
+hadoopConfiguration)
+.map(x => GlobalSortOperates.toStringArrayRow(x._2, columnCount))
+}
+
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.longAccumulator("Partial Success 
Accumulator")
+
+val inputStepRowNumber = sc.longAccumulator("Input Processor 
Accumulator")
+val convertStepRowNumber = sc.longAccumulator("Convert Processor 
Accumulator")
+val sortStepRowNumber = sc.longAccumulator("Sort Processor 
Accumulator")
+val writeStepRowNumber = sc.longAccumulator("Write Processor 
Accumulator")
+
+// 1. Input
+val inputRDD = originRDD.mapPartitions(rows => 
GlobalSortOperates.toRDDIterator(rows, modelBroadcast))
+  .mapPartitionsWithIndex { case (index, rows) =>
+GlobalSortOperates.inputFunc(rows, index, currentLoadCount, 
modelBroadcast, inputStepRowNumber)
+  }
+
+// 2. Convert
+val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) 
=>
+  GlobalSortOperates.convertFunc(rows, index, currentLoadCount, 
modelBroadcast, partialSuccessAccum,
+convertStepRowNumber)
+}.filter(_ != null)// Filter the bad record
+
+// 3. Sort
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val sortParameters = SortParameters.createSortP

[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process

2017-06-11 Thread watermen
Github user watermen commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/910#discussion_r121304371
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.load
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+import 
org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger
+import org.apache.spark.util.LongAccumulator
+
+object GlobalSortHelper {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: 
LongAccumulator): Unit = {
+val key = new CarbonTableIdentifier(loadModel.getDatabaseName, 
loadModel.getTableName, null).getBadRecordLoggerKey
+if (null != BadRecordsLogger.hasBadRecord(key)) {
+  LOGGER.error("Data Load is partially success for table " + 
loadModel.getTableName)
+  badRecordsAccum.add(1)
+} else {
+  LOGGER.info("Data loading is successful for table " + 
loadModel.getTableName)
+}
+  }
+
+  def tryWithSafeFinally[T](tableName: String, block: => T)
--- End diff --

Delete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Acquire semaphore before submit a p...

2017-06-02 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-06-01 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
@ravipesala Thanks for your solution, PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-06-01 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
@ravipesala You can reproduce this case by add some log and run loading 
sample.csv testcase(TestDataLoadWithFileName).
It's hard to reproducer, so I add sleep in Producer to simulate the time of 
processing data.

![image](https://cloud.githubusercontent.com/assets/1400819/26669092/19e9535e-46df-11e7-9d02-c12b05bb4a90.png)

![image](https://cloud.githubusercontent.com/assets/1400819/26669073/0c56c64a-46df-11e7-8071-6ba2e7fa5785.png)
The log lost page like below:
```
00:15:42 [Thread-65]###addDataToStore
00:15:42 [Thread-65]###addDataToStore
00:15:43 [Thread-73]###Put ---> isWriteAll:falseindex:1
00:15:44 [Thread-72]###Put ---> isWriteAll:falseindex:0
00:15:44 [Thread-71]###Get ---> isWriteAll:falseindex:0
00:15:44 [Thread-71]###Get ---> isWriteAll:falseindex:1
00:15:44 [Thread-65]###addDataToStore
00:15:44 [Thread-65]###addDataToStore
00:15:44 [Thread-65]###finish
00:15:46 [Thread-72]###Put ---> isWriteAll:falseindex:1
00:15:46 [Thread-72]###Put ---> isWriteAll:true index:0
00:15:46 [Thread-71]###Get ---> isWriteAll:true index:0// Last 
page is not be consumed at the end.
00:15:46 [Thread-71]###Get ---> isWriteAll:falseindex:1
00:15:47 [Thread-73]###Put ---> isWriteAll:falseindex:0
00:15:47 [Thread-71]###Get ---> isWriteAll:falseindex:0
```






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-05-31 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
@kumarvishal09 It is very hard to write the testcase because this case is 
happened occasionally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-05-31 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-05-31 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...

2017-05-31 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/carbondata/pull/978
  
test it please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] carbondata pull request #719: [WIP][CARBONDATA-844] Avoid to get useless spl...

2017-05-31 Thread watermen
Github user watermen closed the pull request at:

https://github.com/apache/carbondata/pull/719


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #863: [CARBONDATA-998] Don't request execu...

2017-04-27 Thread watermen
GitHub user watermen opened a pull request:

https://github.com/apache/incubator-carbondata/pull/863

[CARBONDATA-998] Don't request executors when we use carbon distribution

In the current implementation, carbon request executors if it need more 
executors. This brings the following questions:
1. Carbon interferes with spark dispatch to make request/remove executors 
confusedly.
2. Carbon will involk DistributionUtil.getDistinctNodesList and sleep some 
times to make sure the executors are assigned, it wastes a lot of time.

We'd better make the data scan and schedule independently.

### In CarbonData layout
Make one task scan multiple splits to make the input size of task larger 
and the number of tasks fewer.
### In Spark layout(dynamic allocation is enable)
Request executors if some resources are released by other apps and we won't 
reach the max number of executors.

Now, I only invoke DistributionUtil.getNodeList insteads of 
DistributionUtil.ensureExecutorsAndGetNodeList. We can talked about this idea 
here and then I can remove the useless code about 
DistributionUtil.ensureExecutorsAndGetNodeList.

cc @jackylk @QiangCai @kumarvishal09 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/watermen/incubator-carbondata CARBONDATA-998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-carbondata/pull/863.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #863


commit 824d0f7790f5559d48be6b1878786bc384af51a9
Author: Yadong Qi <qiyadong2...@gmail.com>
Date:   2017-04-27T07:54:49Z

Don't request executors when we use carbon distribution




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata issue #659: [CARBONDATA-781] Reuse SegmentProperties ob...

2017-04-24 Thread watermen
Github user watermen commented on the issue:

https://github.com/apache/incubator-carbondata/pull/659
  
@jackylk @kumarvishal09 Can you review the code again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---