[GitHub] carbondata pull request #1736: [CARBONDATA-1904][CARBONDATA-1905] Support au...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1736#discussion_r159039529 --- Diff: streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala --- @@ -80,6 +80,12 @@ class CarbonAppendableStreamSink( CarbonProperties.getInstance().getHandoffSize ) + // auto handoff + private val enableAutoHandoff = hadoopConf.getBoolean( +CarbonCommonConstants.ENABLE_AUTO_HANDOFF, --- End diff -- Write the name in doc. ---
[GitHub] carbondata pull request #1736: [CARBONDATA-1904][CARBONDATA-1905] Support au...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1736#discussion_r159030981 --- Diff: core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java --- @@ -1459,6 +1459,13 @@ */ public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size"; + /** + * enable auto handoff streaming segment + */ + public static final String ENABLE_AUTO_HANDOFF = "carbon.enable.auto.handoff"; --- End diff -- I think `carbon.streaming.auto.handoff.enabled` is better. ---
[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1665#discussion_r157410277 --- Diff: integration/spark2/src/main/spark2.2/CarbonSessionState.scala --- @@ -280,25 +280,26 @@ class CarbonOptimizer( } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { helper.createCarbonTable(ctx.createTableHeader, - ctx.skewSpec, --- End diff -- @jackylk We should use the right format at the first time, I mean use 2 blanks instead of 4 blanks. We should avoid unrelated modify, so we can keep each MR with less diff. Better to review and better to rebase. ---
[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1665#discussion_r157409856 --- Diff: integration/spark2/src/main/spark2.1/CarbonSessionState.scala --- @@ -259,25 +259,26 @@ object CarbonOptimizerUtil { } } -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends - SparkSqlAstBuilder(conf) { +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { - val helper = new CarbonHelperSqlAstBuilder(conf, parser) + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { val fileStorage = helper.getFileStorage(ctx.createFileFormat) if (fileStorage.equalsIgnoreCase("'carbondata'") || fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { helper.createCarbonTable(ctx.createTableHeader, --- End diff -- move `ctx.createTableHeader` to next line ---
[GitHub] carbondata pull request #1665: [CARBONDATA-1884] Add CTAS support to carbond...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1665#discussion_r157408233 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.table + +import scala.util.control.NonFatal + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.TableInfo + +/** + * Create table and insert the query result into it. + * + * @param query the query whose result will be insert into the new relation + * @param tableInfo the Table Describe, which may contains serde, storage handler etc. + * @param ifNotExistsSet allow continue working if it's already exists, otherwise + * raise exception + * @param tableLocation store location where the table need to be created + */ +case class CarbonCreateTableAsSelectCommand(query: LogicalPlan, +tableInfo: TableInfo, +ifNotExistsSet: Boolean = false, +tableLocation: Option[String] = None) extends MetadataCommand { + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) +val tableName = tableInfo.getFactTable.getTableName +var databaseOpt: Option[String] = None +if (tableInfo.getDatabaseName != null) { + databaseOpt = Some(tableInfo.getDatabaseName) +} +val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession) +LOGGER.audit(s"Request received for CTAS for $dbName.$tableName") +lazy val carbonDataSourceHadoopRelation = { --- End diff -- Why you use lazy here? ---
[GitHub] carbondata pull request #863: [CARBONDATA-998] Don't request executors when ...
Github user watermen closed the pull request at: https://github.com/apache/carbondata/pull/863 ---
[GitHub] carbondata pull request #606: [CARBONDATA-713] Make the store path in right ...
Github user watermen closed the pull request at: https://github.com/apache/carbondata/pull/606 ---
[GitHub] carbondata issue #1245: [CARBONDATA-1366]Change rdd storage level to 'MEMORY...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/1245 @jackylk Agreed with @xuchuanyin, Sparkâs storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. So different environment correspond to different storage level. So here we'd better make a conf named 'storage_level', and the default value of it is MEMORY_ONLY(the same to default value in spark). We can get more info about storage level in spark website.(http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #1223: [WIP] Support cleaning garbage segment in all tables
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/1223 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #1037: [CARBONDATA-1163][Bug-Fix] Rollback the code ...
GitHub user watermen opened a pull request: https://github.com/apache/carbondata/pull/1037 [CARBONDATA-1163][Bug-Fix] Rollback the code because it can't reuse the convertRow You can merge this pull request into a Git repository by running: $ git pull https://github.com/watermen/incubator-carbondata load-fixbug2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/1037.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1037 commit 36923c2f782b4401bf45b9fc7dfcffeaec44befa Author: Yadong Qi <qiyadong2...@gmail.com> Date: 2017-06-15T07:16:28Z Don't reuse the converRow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #972: [CARBONDATA-1065] Added set command in carbon to upda...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/972 @mohammadshahidkhan But we'd better to follow the habit of users who is familiar with spark/hive/hdfs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #972: [CARBONDATA-1065] Added set command in carbon to upda...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/972 I think we'd better to use `SET carbon.load.sort.scope = LOCAL_SORT` insteads of `SET SORT_SCOPE = LOCAL_SORT`, @mohammadshahidkhan can you explain the reason? cc @jackylk --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #1000: [CARBONDATA-1018] Add unsafe ColumnPage implementati...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/1000 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1000#discussion_r121876896 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -323,15 +340,8 @@ public double getDouble(int rowId) { return doubleData[rowId]; } - /** - * Get decimal value at rowId - */ - public byte[] getDecimalBytes(int rowId) { -return byteArrayData[rowId]; - } - public BigDecimal getDecimal(int rowId) { --- End diff -- Add annotation like getDouble --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1000#discussion_r121875823 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -508,7 +567,7 @@ public static ColumnPage decompress(Compressor compressor, DataType dataType, } // input byte[] is LV encoded, this function can expand it into byte[][] - private static byte[][] deflatten(byte[] input) { + protected static byte[][] deflatten(byte[] input) { --- End diff -- Agree with erlu. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #1000: [CARBONDATA-1018] Add unsafe ColumnPage imple...
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1000#discussion_r121874665 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java --- @@ -98,56 +117,54 @@ public static ColumnPage newPage(DataType dataType, int pageSize) { default: throw new RuntimeException("Unsupported data dataType: " + dataType); } -instance.stats = new ColumnPageStatsVO(dataType); -instance.nullBitSet = new BitSet(pageSize); return instance; } protected static ColumnPage newBytePage(byte[] byteData) { ColumnPage columnPage = new ColumnPage(BYTE, byteData.length); --- End diff -- If don't support, we'd better to add some notes to explain. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121315052 --- Diff: processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortHelper.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.newflow.sort; + +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.util.NonDictionaryUtil; + +public class SortHelper { --- End diff -- In `SortTempFileChunkHolder.getRowFromStream`, data is from stream, so we can't reuse the function in `SortStepRowUtil`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121306203 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.util.Comparator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.storage.StorageLevel + +/** + * Use sortBy operator in spark to load the data + */ +object GlobalSort { --- End diff -- Name it to `DataLoadProcess` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121305832 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.csvload.StringArrayWritable +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl +import org.apache.carbondata.processing.newflow.sort.SortHelper +import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters +import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.apache.spark.util.LongAccumulator +import org.apache.spark.{SparkEnv, TaskContext} + +import scala.util.Random + +object GlobalSortOperates { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def toStringArrayRow(row: StringArrayWritable, columnCount: Int): StringArrayRow = { +val outRow = new StringArrayRow(new Array[String](columnCount)) +outRow.setValues(row.get()) + } + + def toRDDIterator( + rows: Iterator[Row], + modelBroadcast: Broadcast[CarbonLoadModel]): Iterator[Array[AnyRef]] = { +new Iterator[Array[AnyRef]] { + val iter = new NewRddIterator(rows, modelBroadcast.value, TaskContext.get()) + + override def hasNext: Boolean = iter.hasNext + + override def next(): Array[AnyRef] = iter.next +} + } + + def inputFunc( + rows: Iterator[Array[AnyRef]], + index: Int, + currentLoadCount: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowNumber: LongAccumulator): Iterator[CarbonRow] = { +val model: CarbonLoadModel = getModelCopy(index, currentLoadCount, modelBroadcast) +val conf = DataLoadProcessBuilder.createConfiguration(model) +val rowParser = new RowParserImpl(conf.getDataFields, conf) + +TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) +} + +new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { +rowNumber.add(1) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121305557 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortOperates.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.csvload.StringArrayWritable +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl +import org.apache.carbondata.processing.newflow.sort.SortHelper +import org.apache.carbondata.processing.newflow.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters +import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} +import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.Row +import org.apache.spark.util.LongAccumulator +import org.apache.spark.{SparkEnv, TaskContext} + +import scala.util.Random + +object GlobalSortOperates { --- End diff -- Name it to `DataLoadProcessorStep` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121305190 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSort.scala --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.util.Comparator + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.row.CarbonRow +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails +import org.apache.carbondata.processing.csvload.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.DataLoadProcessBuilder +import org.apache.carbondata.processing.sortandgroupby.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.util.CommonUtil +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.command.ExecutionErrors +import org.apache.spark.storage.StorageLevel + +/** + * Use sortBy operator in spark to load the data + */ +object GlobalSort { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def loadDataUsingGlobalSort( + sc: SparkContext, + dataFrame: Option[DataFrame], + model: CarbonLoadModel, + currentLoadCount: Int): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +val originRDD = if (dataFrame.isDefined) { + dataFrame.get.rdd +} else { + // input data from files + val hadoopConfiguration = new Configuration() + CommonUtil.configureCSVInputFormat(hadoopConfiguration, model) + hadoopConfiguration.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val columnCount = model.getCsvHeaderColumns.length + new NewHadoopRDD[NullWritable, StringArrayWritable]( +sc, +classOf[CSVInputFormat], +classOf[NullWritable], +classOf[StringArrayWritable], +hadoopConfiguration) +.map(x => GlobalSortOperates.toStringArrayRow(x._2, columnCount)) +} + +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator") + +val inputStepRowNumber = sc.longAccumulator("Input Processor Accumulator") +val convertStepRowNumber = sc.longAccumulator("Convert Processor Accumulator") +val sortStepRowNumber = sc.longAccumulator("Sort Processor Accumulator") +val writeStepRowNumber = sc.longAccumulator("Write Processor Accumulator") + +// 1. Input +val inputRDD = originRDD.mapPartitions(rows => GlobalSortOperates.toRDDIterator(rows, modelBroadcast)) + .mapPartitionsWithIndex { case (index, rows) => +GlobalSortOperates.inputFunc(rows, index, currentLoadCount, modelBroadcast, inputStepRowNumber) + } + +// 2. Convert +val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) => + GlobalSortOperates.convertFunc(rows, index, currentLoadCount, modelBroadcast, partialSuccessAccum, +convertStepRowNumber) +}.filter(_ != null)// Filter the bad record + +// 3. Sort +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val sortParameters = SortParameters.createSortP
[GitHub] carbondata pull request #910: [WIP] Global sort by spark in load process
Github user watermen commented on a diff in the pull request: https://github.com/apache/carbondata/pull/910#discussion_r121304371 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.processing.model.CarbonLoadModel +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger +import org.apache.spark.util.LongAccumulator + +object GlobalSortHelper { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: LongAccumulator): Unit = { +val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName, null).getBadRecordLoggerKey +if (null != BadRecordsLogger.hasBadRecord(key)) { + LOGGER.error("Data Load is partially success for table " + loadModel.getTableName) + badRecordsAccum.add(1) +} else { + LOGGER.info("Data loading is successful for table " + loadModel.getTableName) +} + } + + def tryWithSafeFinally[T](tableName: String, block: => T) --- End diff -- Delete --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Acquire semaphore before submit a p...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 @ravipesala Thanks for your solution, PR updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 @ravipesala You can reproduce this case by add some log and run loading sample.csv testcase(TestDataLoadWithFileName). It's hard to reproducer, so I add sleep in Producer to simulate the time of processing data. ![image](https://cloud.githubusercontent.com/assets/1400819/26669092/19e9535e-46df-11e7-9d02-c12b05bb4a90.png) ![image](https://cloud.githubusercontent.com/assets/1400819/26669073/0c56c64a-46df-11e7-8071-6ba2e7fa5785.png) The log lost page like below: ``` 00:15:42 [Thread-65]###addDataToStore 00:15:42 [Thread-65]###addDataToStore 00:15:43 [Thread-73]###Put ---> isWriteAll:falseindex:1 00:15:44 [Thread-72]###Put ---> isWriteAll:falseindex:0 00:15:44 [Thread-71]###Get ---> isWriteAll:falseindex:0 00:15:44 [Thread-71]###Get ---> isWriteAll:falseindex:1 00:15:44 [Thread-65]###addDataToStore 00:15:44 [Thread-65]###addDataToStore 00:15:44 [Thread-65]###finish 00:15:46 [Thread-72]###Put ---> isWriteAll:falseindex:1 00:15:46 [Thread-72]###Put ---> isWriteAll:true index:0 00:15:46 [Thread-71]###Get ---> isWriteAll:true index:0// Last page is not be consumed at the end. 00:15:46 [Thread-71]###Get ---> isWriteAll:falseindex:1 00:15:47 [Thread-73]###Put ---> isWriteAll:falseindex:0 00:15:47 [Thread-71]###Get ---> isWriteAll:falseindex:0 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 @kumarvishal09 It is very hard to write the testcase because this case is happened occasionally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata issue #978: [CARBONDATA-1109] Cover the case when last page is no...
Github user watermen commented on the issue: https://github.com/apache/carbondata/pull/978 test it please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] carbondata pull request #719: [WIP][CARBONDATA-844] Avoid to get useless spl...
Github user watermen closed the pull request at: https://github.com/apache/carbondata/pull/719 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-carbondata pull request #863: [CARBONDATA-998] Don't request execu...
GitHub user watermen opened a pull request: https://github.com/apache/incubator-carbondata/pull/863 [CARBONDATA-998] Don't request executors when we use carbon distribution In the current implementation, carbon request executors if it need more executors. This brings the following questions: 1. Carbon interferes with spark dispatch to make request/remove executors confusedly. 2. Carbon will involk DistributionUtil.getDistinctNodesList and sleep some times to make sure the executors are assigned, it wastes a lot of time. We'd better make the data scan and schedule independently. ### In CarbonData layout Make one task scan multiple splits to make the input size of task larger and the number of tasks fewer. ### In Spark layout(dynamic allocation is enable) Request executors if some resources are released by other apps and we won't reach the max number of executors. Now, I only invoke DistributionUtil.getNodeList insteads of DistributionUtil.ensureExecutorsAndGetNodeList. We can talked about this idea here and then I can remove the useless code about DistributionUtil.ensureExecutorsAndGetNodeList. cc @jackylk @QiangCai @kumarvishal09 You can merge this pull request into a Git repository by running: $ git pull https://github.com/watermen/incubator-carbondata CARBONDATA-998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-carbondata/pull/863.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #863 commit 824d0f7790f5559d48be6b1878786bc384af51a9 Author: Yadong Qi <qiyadong2...@gmail.com> Date: 2017-04-27T07:54:49Z Don't request executors when we use carbon distribution --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-carbondata issue #659: [CARBONDATA-781] Reuse SegmentProperties ob...
Github user watermen commented on the issue: https://github.com/apache/incubator-carbondata/pull/659 @jackylk @kumarvishal09 Can you review the code again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---