[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19683 **[Test build #85462 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85462/testReport)** for PR 19683 at commit [`9edd864`](https://github.com/apache/spark/commit/9edd864e6bd3bc1fce0f6c4d2b45620addb82514). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19683 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19683 **[Test build #85461 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85461/testReport)** for PR 19683 at commit [`17db21e`](https://github.com/apache/spark/commit/17db21e69886e761cc3d3a0b8de730a0b6e7ad82). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19683 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85461/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19683 **[Test build #85461 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85461/testReport)** for PR 19683 at commit [`17db21e`](https://github.com/apache/spark/commit/17db21e69886e761cc3d3a0b8de730a0b6e7ad82). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158906785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- will try improve. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user uzadude commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158906660 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala --- @@ -359,12 +359,12 @@ package object dsl { def generate( generator: Generator, -join: Boolean = false, +unrequiredChildOutput: Seq[Attribute] = Nil, --- End diff -- It looks like its a mistake.. I see it was only used in the test suites and they all overidden it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20062: [SPARK-22892] [SQL] Simplify some estimation logic by us...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20062 **[Test build #85460 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85460/testReport)** for PR 20062 at commit [`c71d688`](https://github.com/apache/spark/commit/c71d68878fd52c24f9f2ac5fc4e95257368b22b8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20030: [SPARK-10496][CORE] Efficient RDD cumulative sum
Github user zhengruifeng closed the pull request at: https://github.com/apache/spark/pull/20030 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19977 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85454/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19977 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19977 **[Test build #85454 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85454/testReport)** for PR 19977 at commit [`1c94418`](https://github.com/apache/spark/commit/1c94418c3aa5fe6610914a88b3b2ef3919b56ac4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator f...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19527#discussion_r158904223 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala --- @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkException +import org.apache.spark.annotation.Since +import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute._ +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, HasOutputCols} +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{col, lit, udf} +import org.apache.spark.sql.types.{DoubleType, NumericType, StructField, StructType} + +/** Private trait for params and common methods for OneHotEncoderEstimator and OneHotEncoderModel */ +private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid +with HasInputCols with HasOutputCols { + + /** + * Param for how to handle invalid data. + * Options are 'keep' (invalid data presented as an extra categorical feature) or + * 'error' (throw an error). + * Default: "error" + * @group param + */ + @Since("2.3.0") + override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid", +"How to handle invalid data " + +"Options are 'keep' (invalid data presented as an extra categorical feature) " + +"or error (throw an error).", + ParamValidators.inArray(OneHotEncoderEstimator.supportedHandleInvalids)) + + setDefault(handleInvalid, OneHotEncoderEstimator.ERROR_INVALID) + + /** + * Whether to drop the last category in the encoded vector (default: true) + * @group param + */ + @Since("2.3.0") + final val dropLast: BooleanParam = +new BooleanParam(this, "dropLast", "whether to drop the last category") + setDefault(dropLast -> true) + + /** @group getParam */ + @Since("2.3.0") + def getDropLast: Boolean = $(dropLast) + + protected def validateAndTransformSchema( + schema: StructType, dropLast: Boolean, keepInvalid: Boolean): StructType = { +val inputColNames = $(inputCols) +val outputColNames = $(outputCols) +val existingFields = schema.fields + +require(inputColNames.length == outputColNames.length, + s"The number of input columns ${inputColNames.length} must be the same as the number of " + +s"output columns ${outputColNames.length}.") + +// Input columns must be NumericType. +inputColNames.foreach(SchemaUtils.checkNumericType(schema, _)) + +// Prepares output columns with proper attributes by examining input columns. +val inputFields = $(inputCols).map(schema(_)) + +val outputFields = inputFields.zip(outputColNames).map { case (inputField, outputColName) => + OneHotEncoderCommon.transformOutputColumnSchema( +inputField, outputColName, dropLast, keepInvalid) +} +outputFields.foldLeft(schema) { case (newSchema, outputField) => + SchemaUtils.appendColumn(newSchema, outputField) +} + } +} + +/** + * A one-hot encoder that maps a column of category indices to a column of binary vectors, with + * at most a single one-value per row that indicates the input category index. + * For example with 5 categories, an input value of 2.0 would map to an output vector of + * `[0.0, 0.0, 1.0, 0.0]`. + * The last category is not included by default (configurable via `dropLast`), + * because it makes the vector entries sum up to one, and hence linearly dependent. + * So an input value of 4.0 maps to `[0.0,
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158902856 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala --- @@ -85,11 +86,20 @@ case class GenerateExec( val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (index, iter) => val generatorNullRow = new GenericInternalRow(generator.elementSchema.length) - val rows = if (join) { + val rows = if (requiredChildOutput.nonEmpty) { + +val pruneChildForResult: InternalRow => InternalRow = + if (unrequiredChildOutput.isEmpty) { +identity + } else { +UnsafeProjection.create(requiredChildOutput, child.output) + } + val joinedRow = new JoinedRow iter.flatMap { row => + // we should always set the left (child output) --- End diff -- `child output` -> `required child output` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891945 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend * their output. * * @param generator the generator expression - * @param join when true, each output row is implicitly joined with the input tuple that produced - * it. + * @param unrequiredChildOutput each output row is implicitly joined with the relevant part from the --- End diff -- Ok. But the param doc seems can be improved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158892688 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala --- @@ -359,12 +359,12 @@ package object dsl { def generate( generator: Generator, -join: Boolean = false, +unrequiredChildOutput: Seq[Attribute] = Nil, --- End diff -- Previously `join` is false by default. This default `unrequiredChildOutput` value seems to contradict previous usage. Should we be consistent with before? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 ping @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user zuotingbing commented on the issue: https://github.com/apache/spark/pull/20029 Could you please to check this PR? Thanks @liufengdb --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20099 **[Test build #85459 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85459/testReport)** for PR 20099 at commit [`a171f29`](https://github.com/apache/spark/commit/a171f295be9daeb3eecd2c031ff55a80b0fbe303). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19977 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85451/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19977 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19977 **[Test build #85451 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85451/testReport)** for PR 19977 at commit [`179c6fd`](https://github.com/apache/spark/commit/179c6fdf261d3392d4d3477a68f7fde60d190435). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20099 **[Test build #85458 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85458/testReport)** for PR 20099 at commit [`2e0007d`](https://github.com/apache/spark/commit/2e0007d1f172ec280788285fcffb02ad362a4a98). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20099 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85458/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20099 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20099 **[Test build #85458 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85458/testReport)** for PR 20099 at commit [`2e0007d`](https://github.com/apache/spark/commit/2e0007d1f172ec280788285fcffb02ad362a4a98). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20096 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20096 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85455/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20096 **[Test build #85455 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85455/testReport)** for PR 20096 at commit [`bcaa694`](https://github.com/apache/spark/commit/bcaa694cd18f5a6df8815882d715a64fda4cc6d9). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85453/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20094 **[Test build #85453 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85453/testReport)** for PR 20094 at commit [`6a25d60`](https://github.com/apache/spark/commit/6a25d60a0e0a24194c1764e217d74d807056039c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20082: [SPARK-22897][CORE]: Expose stageAttemptId in TaskContex...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20082 **[Test build #85457 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85457/testReport)** for PR 20082 at commit [`72a3abf`](https://github.com/apache/spark/commit/72a3abf8110a13c3719b8d6a600edee509b36ae9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158899707 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +totalRowC
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158899698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +totalRowC
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158899621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +totalRowC
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85450/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20091: [SPARK-22465][FOLLOWUP] Update the number of partitions ...
Github user mridulm commented on the issue: https://github.com/apache/spark/pull/20091 @jiangxb1987 I am not disagreeing with your hypothesis that default parallelism might not be optimal in all cases within an application (example - when different RDD's in application have widely varying cardinalities). Since spark.default.parallelism is an exposed interface, which applications depend on, changing the semantics here will be a regression in terms of functionality and will be breaking an exposed contract in spark. This is why we have the option of explicitly overriding number of partitions when default does not work well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158898080 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. + */ + def stageAttemptId(): Int --- End diff -- Yeah, if we are defining `stageAttemptId` from scratch, I would go for `stageAttemptNumber`. However `stageAttemptId` are already used elsewhere in the codebase, Like in [Task.scala](https://github.com/apache/spark/blob/ded6d27e4eb02e4530015a95794e6ed0586faaa7/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L55). I think it's more important to be consistent. However I could update the comment to reflect the attempt number part if you wish --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20094 **[Test build #85450 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85450/testReport)** for PR 20094 at commit [`cd39760`](https://github.com/apache/spark/commit/cd397605eaf81e3725396c715602734596a83ac3). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897767 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- OK then. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897401 --- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala --- @@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable { */ def stageId(): Int + /** + * An ID that is unique to the stage attempt that this task belongs to. + */ + def stageAttemptId(): Int --- End diff -- I think we should call it `stageAttempNumber` to be consistent with `taskAttemptNumber`. Also let's follow the comment of `attemptNumber` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20059: [SPARK-22648][K8s] Add documentation covering ini...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20059 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158897297 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- it's kind of a code style standard: add `override` if it is override. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19943 **[Test build #85456 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85456/testReport)** for PR 19943 at commit [`3e1d479`](https://github.com/apache/spark/commit/3e1d479196dfcb21e2d5f641a50c0b663b8247a1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20059: [SPARK-22648][K8s] Add documentation covering init conta...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20059 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158897045 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(OrcColumnarBatchReader.DEFAULT_SIZE) +totalRowCount = reader.getNumberOfRows +logDebug(s"totalRowCount = $totalRowCount") + +this.requiredSchema
[GitHub] spark pull request #19954: [SPARK-22757][Kubernetes] Enable use of remote de...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19954 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85452/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19954: [SPARK-22757][Kubernetes] Enable use of remote dependenc...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19954 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20094 **[Test build #85452 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85452/testReport)** for PR 20094 at commit [`8879870`](https://github.com/apache/spark/commit/887987015f6ec35cc0e25648f9f714e4b6bfa982). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20094 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20093: [SPARK-22909][SS]Move Structured Streaming v2 API...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20093 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20093: [SPARK-22909][SS]Move Structured Streaming v2 APIs to st...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20093 LGTM, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20036: [SPARK-18016][SQL][FOLLOW-UP] Code Generation: Co...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20036 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20036: [SPARK-18016][SQL][FOLLOW-UP] Code Generation: Constant ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20036 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20091: [SPARK-22465][FOLLOWUP] Update the number of partitions ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20091 The major concern is that `spark.default.parallelism` usually is set a relatively small value, so in case the safety-check failed, the value of `defaultParallelism` can even be smaller than the number of partitions of the existing partitioner, this is the regression case I want to fix in this PR. A further more issue is that, we should rethink whether we should rely on `defaultParallelism` to determine the numPartitions of the default partitioner, or the number of partitions should be determined completely dynamistic by the upstream RDDs? The current same-as-defaultParallelism way is really prone to cause OOM during shuffle stage. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895416 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895321 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895355 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import scala.util.Random + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure ORC read performance. + * + * This is in `sql/hive` module in order to compare `sql/core` and `sql/hive` ORC data sources. + */ +// scalastyle:off line.size.limit +object OrcReadBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("OrcReadBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + private val NATIVE_ORC_FORMAT = "org.apache.spark.sql.execution.datasources.orc.OrcFileFormat" + private val HIVE_ORC_FORMAT = "org.apache.spark.sql.hive.orc.OrcFileFormat" + + private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { +val dirORC = dir.getCanonicalPath + +if (partition.isDefined) { + df.write.partitionBy(partition.get).orc(dirORC) +} else { + df.write.orc(dirORC) +} + + spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable") + spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable") + } + + def numericScanBenchmark(values: Int, dataType: DataType): Unit = { +val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1") + +prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1")) + +sqlBenchmark.addCase("Native ORC") { _ => + spark.sql("SELECT sum(id) FROM nativeOrcTable").collect() +} + +sqlBenchmark.addCase("Hive built-in ORC") { _ => + spark.sql("SELECT sum(id) FROM hiveOrcTable").collect() +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +SQL Single TINYINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Native ORC 132 / 138 119.4 8.4 1.0X +Hive built-in ORC 1328 / 1333 11.8 84.5 0.1X + +SQL Single SMALLINT Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Native ORC 178 / 188 88.2 11.3 1.0X +
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895273 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895242 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158895185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20096 **[Test build #85455 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85455/testReport)** for PR 20096 at commit [`bcaa694`](https://github.com/apache/spark/commit/bcaa694cd18f5a6df8815882d715a64fda4cc6d9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894878 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20082#discussion_r158894808 --- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala --- @@ -42,6 +42,7 @@ import org.apache.spark.util._ */ private[spark] class TaskContextImpl( val stageId: Int, +val stageAttemptId: Int, --- End diff -- Will do. Would you tell me the difference or rationale? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894770 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. --- End diff -- Since it's `Seq[WritableColumnVector]`, I'll keep the current one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894676 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ --- End diff -- Yep. It's renamed to `recordReader`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894581 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894279 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894151 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158894114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19943 Thank you for review, @cloud-fan, @viirya, @kiszk, @HyukjinKwon, @henrify. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20100: [SPARK-22913][SQL] Improved Hive Partition Pruning
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20100 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20100: [SPARK-22913][SQL] Improved Hive Partition Prunin...
GitHub user ameent opened a pull request: https://github.com/apache/spark/pull/20100 [SPARK-22913][SQL] Improved Hive Partition Pruning Adding support for Timestamp and Fractional column types. The pruning of partitions of these types is being put behind default options that are set to false, as it's not clear which hive metastore implementations support predicates on these types of columns. The AWS Glue Catalog http://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html does support filters on timestamp and fractional columns and pushing these filters down to it has significant performance improvements in our use cases. As part of this change the hive pruning suite is renamed (a TODO) and 2 ignored tests are added that will validate the functionality of partition pruning through integration tests. The tests are ignored since the integration test setup uses a Hive client that throws errors when it sees partition column filters on non-integral and non-string columns. Unit tests are added to validate filtering, which are active. ## What changes were proposed in this pull request? See https://issues.apache.org/jira/browse/SPARK-22913 This change addresses the JIRA. I'm looking for feedback on the change itself and whether the config values I added make sense. I was not able to find official Hive specification on which filters a metastore needs to support and as such, feel hesitant to turn on this behavior by default. Piggybacking on top of "advancedPartitionPruning" option felt wrong because that config toggles whether "in (...)" queries are expanded in a series of "ors" and I don't want people to be forced to turn off that behavior alongside not pushing timestamp predicates. ## How was this patch tested? This change is tested via unit tests, modified integration tests (that are ignored) and manual tests on EMR 5.10 running against AWS Glue Catalog as the Hive metastore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ameent/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20100.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20100 commit 6b1d5dc8874bba7c707428818123ec63fd7e84f0 Author: Ameen Tayyebi Date: 2017-12-28T02:56:13Z [SPARK-22913][SQL] Improved Hive Partition Pruning Adding support for Timestamp and Fractional column types. The pruning of partitions of these types is being put behind default options that are set to false, as it's not clear which hive metastore implementations support predicates on these types of columns. The AWS Glue Catalog http://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html does support filters on timestamp and fractional columns and pushing these filters down to it has significant performance improvements in our use cases. As part of this change the hive pruning suite is renamed (a TODO) and 2 ignored tests are added that will validate the functionality of partition pruning through integration tests. The tests are ignored since the integration test setup uses a Hive client that throws errors when it sees partition column filters on non-integral and non-string columns. Unit tests are added to validate filtering, which are active. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r158893226 --- Diff: sql/core/src/test/resources/sql-tests/inputs/typeCoercion/native/concat.sql --- @@ -0,0 +1,93 @@ +-- Concatenate mixed inputs (output type is string) +SELECT (col1 || col2 || col3) col +FROM ( + SELECT +id col1, +string(id + 1) col2, +encode(string(id + 2), 'utf-8') col3 + FROM range(10) +); + +SELECT ((col1 || col2) || (col3 || col4) || col5) col +FROM ( + SELECT +'prefix_' col1, +id col2, +string(id + 1) col3, +encode(string(id + 2), 'utf-8') col4, +CAST(id AS DOUBLE) col5 + FROM range(10) +); + +SELECT ((col1 || col2) || (col3 || col4)) col --- End diff -- are these 3 cases testing the default value of `spark.sql.function.concatBinaryAsString`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r158893248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { --- End diff -- Thank you for decision, @cloud-fan . If then, I'll try to update the PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19979: [SPARK-22881][ML][TEST] ML regression package testsuite ...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19979 Actually, going further than what Bago said: All of the places which use globalCheckFunction assume that Dataset.collect() returns the Rows in a fixed order. We should really fix those unit tests to check values row-by-row. As a side effect, that would allow us to eliminate globalCheckFunction. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19977 **[Test build #85454 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85454/testReport)** for PR 19977 at commit [`1c94418`](https://github.com/apache/spark/commit/1c94418c3aa5fe6610914a88b3b2ef3919b56ac4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20099 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20099 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85449/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20099 **[Test build #85449 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85449/testReport)** for PR 20099 at commit [`e4b63f5`](https://github.com/apache/spark/commit/e4b63f5fab81b7637d107efe6524b2f41c681a10). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r158892653 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -658,6 +660,33 @@ object TypeCoercion { } } + /** + * Coerces the types of [[Concat]] children to expected ones. + * + * If `spark.sql.function.concatBinaryAsString` is false and all children types are binary, + * the expected types are binary. Otherwise, the expected ones are strings. + */ + case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule { + +private def typeCastToString(c: Concat): Concat = { + val newChildren = c.children.map { e => +ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e) + } + c.copy(children = newChildren) +} + +override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p => + p transformExpressionsUp { +// Skip nodes if unresolved or empty children +case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c + +case c @ Concat(children) if conf.concatBinaryAsString || +!children.map(_.dataType).forall(_ == BinaryType) => + typeCastToString(c) --- End diff -- ya, ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r158892598 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -658,6 +660,33 @@ object TypeCoercion { } } + /** + * Coerces the types of [[Concat]] children to expected ones. + * + * If `spark.sql.function.concatBinaryAsString` is false and all children types are binary, + * the expected types are binary. Otherwise, the expected ones are strings. + */ + case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule { + +private def typeCastToString(c: Concat): Concat = { + val newChildren = c.children.map { e => +ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e) + } + c.copy(children = newChildren) +} + +override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p => + p transformExpressionsUp { +// Skip nodes if unresolved or empty children +case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c + +case c @ Concat(children) if conf.concatBinaryAsString || +!children.map(_.dataType).forall(_ == BinaryType) => + typeCastToString(c) --- End diff -- we can probably inline this method now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20096 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85448/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20096 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20096 **[Test build #85448 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85448/testReport)** for PR 20096 at commit [`607b902`](https://github.com/apache/spark/commit/607b9026ff1c208dd3d1dd0052f42da5992289ed). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ContinuousKafkaSuite extends KafkaSourceTest with SharedSQLContext ` * `class ContinuousKafkaStressSuite extends KafkaSourceTest with SharedSQLContext ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20098: [SPARK-22914][DEPLOY] Register history.ui.port
Github user srowen commented on the issue: https://github.com/apache/spark/pull/20098 CC @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20094 **[Test build #85453 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85453/testReport)** for PR 20094 at commit [`6a25d60`](https://github.com/apache/spark/commit/6a25d60a0e0a24194c1764e217d74d807056039c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20094 **[Test build #85452 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85452/testReport)** for PR 20094 at commit [`8879870`](https://github.com/apache/spark/commit/887987015f6ec35cc0e25648f9f714e4b6bfa982). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20094 LGTM with two minor comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20094#discussion_r158891321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1079,100 +1083,76 @@ class Analyzer( case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, originalChild) if !s.resolved && originalChild.resolved => -val child = EliminateBarriers(originalChild) -try { - val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) - val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) - val missingAttrs = requiredAttrs -- child.outputSet - if (missingAttrs.nonEmpty) { -// Add missing attributes and then project them away after the sort. -Project(child.output, - Sort(newOrder, s.global, addMissingAttr(child, missingAttrs))) - } else if (newOrder != order) { -s.copy(order = newOrder) - } else { -s - } -} catch { - // Attempting to resolve it might fail. When this happens, return the original plan. - // Users will see an AnalysisException for resolution failure of missing attributes - // in Sort - case ae: AnalysisException => s + case s @ Sort(order, _, child) if !s.resolved && child.resolved => +val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child) +val ordering = newOrder.map(_.asInstanceOf[SortOrder]) +if (child.output == newChild.output) { + s.copy(order = ordering) +} else { + // Add missing attributes and then project them away. + val newSort = s.copy(order = ordering, child = newChild) + Project(child.output, newSort) } - case f @ Filter(cond, originalChild) if !f.resolved && originalChild.resolved => -val child = EliminateBarriers(originalChild) -try { - val newCond = resolveExpressionRecursively(cond, child) - val requiredAttrs = newCond.references.filter(_.resolved) - val missingAttrs = requiredAttrs -- child.outputSet - if (missingAttrs.nonEmpty) { -// Add missing attributes and then project them away. -Project(child.output, - Filter(newCond, addMissingAttr(child, missingAttrs))) - } else if (newCond != cond) { -f.copy(condition = newCond) - } else { -f - } -} catch { - // Attempting to resolve it might fail. When this happens, return the original plan. - // Users will see an AnalysisException for resolution failure of missing attributes - case ae: AnalysisException => f + case f @ Filter(cond, child) if !f.resolved && child.resolved => +val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) +if (child.output == newChild.output) { + f.copy(condition = newCond.head) +} else { + // Add missing attributes and then project them away. + val newFilter = Filter(newCond.head, newChild) + Project(child.output, newFilter) } } -/** - * Add the missing attributes into projectList of Project/Window or aggregateExpressions of - * Aggregate. - */ -private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = { - if (missingAttrs.isEmpty) { -return AnalysisBarrier(plan) - } - plan match { -case p: Project => - val missing = missingAttrs -- p.child.outputSet - Project(p.projectList ++ missingAttrs, addMissingAttr(p.child, missing)) -case a: Aggregate => - // all the missing attributes should be grouping expressions - // TODO: push down AggregateExpression - missingAttrs.foreach { attr => -if (!a.groupingExpressions.exists(_.semanticEquals(attr))) { - throw new AnalysisException(s"Can't add $attr to ${a.simpleString}") -} - } - val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs - a.copy(aggregateExpressions = newAggregateExpressions) -case g: Generate => - // If join is false, we will convert it to true for getting from the child the missing - // attributes that its child might have or could have. - val missing = missingAttrs -- g.child
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891168 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val N = 6 + +val spark = sparkSession +import spark.implicits._ +import org.apache.spark.sql.functions._ + +val df = sparkSession.sparkContext.parallelize( --- End diff -- and put the result like other benchmarks in this file, e.g. ``` /* Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative generate stack wholestage off 12953 / 13070 1.3 772.1 1.0X generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ ``` You can run this benchmark without your PR and put the result in PR comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158891075 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala --- @@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase { generate stack wholestage on 836 / 847 20.1 49.8 15.5X */ } + + ignore("generate explode big struct array") { +val N = 6 + +val spark = sparkSession +import spark.implicits._ +import org.apache.spark.sql.functions._ + +val df = sparkSession.sparkContext.parallelize( --- End diff -- please make it a real benchmark ``` val df = Seq("1", Array.fill(N)(i => (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString))).toDF("col", "arr") runBenchmark("generate big struct array", N) { df.withColumn("arr_col", explode('arr)).select("col", "arr_col.*").count } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19813 LGTM, great work! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20094#discussion_r158890342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1079,100 +1083,76 @@ class Analyzer( case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, originalChild) if !s.resolved && originalChild.resolved => -val child = EliminateBarriers(originalChild) -try { - val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) - val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) - val missingAttrs = requiredAttrs -- child.outputSet - if (missingAttrs.nonEmpty) { -// Add missing attributes and then project them away after the sort. -Project(child.output, - Sort(newOrder, s.global, addMissingAttr(child, missingAttrs))) - } else if (newOrder != order) { -s.copy(order = newOrder) - } else { -s - } -} catch { - // Attempting to resolve it might fail. When this happens, return the original plan. - // Users will see an AnalysisException for resolution failure of missing attributes - // in Sort - case ae: AnalysisException => s + case s @ Sort(order, _, child) if !s.resolved && child.resolved => +val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child) +val ordering = newOrder.map(_.asInstanceOf[SortOrder]) +if (child.output == newChild.output) { + s.copy(order = ordering) +} else { + // Add missing attributes and then project them away. + val newSort = s.copy(order = ordering, child = newChild) + Project(child.output, newSort) } - case f @ Filter(cond, originalChild) if !f.resolved && originalChild.resolved => -val child = EliminateBarriers(originalChild) -try { - val newCond = resolveExpressionRecursively(cond, child) - val requiredAttrs = newCond.references.filter(_.resolved) - val missingAttrs = requiredAttrs -- child.outputSet - if (missingAttrs.nonEmpty) { -// Add missing attributes and then project them away. -Project(child.output, - Filter(newCond, addMissingAttr(child, missingAttrs))) - } else if (newCond != cond) { -f.copy(condition = newCond) - } else { -f - } -} catch { - // Attempting to resolve it might fail. When this happens, return the original plan. - // Users will see an AnalysisException for resolution failure of missing attributes - case ae: AnalysisException => f + case f @ Filter(cond, child) if !f.resolved && child.resolved => +val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) +if (child.output == newChild.output) { + f.copy(condition = newCond.head) +} else { + // Add missing attributes and then project them away. + val newFilter = Filter(newCond.head, newChild) + Project(child.output, newFilter) } } -/** - * Add the missing attributes into projectList of Project/Window or aggregateExpressions of - * Aggregate. - */ -private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = { - if (missingAttrs.isEmpty) { -return AnalysisBarrier(plan) - } - plan match { -case p: Project => - val missing = missingAttrs -- p.child.outputSet - Project(p.projectList ++ missingAttrs, addMissingAttr(p.child, missing)) -case a: Aggregate => - // all the missing attributes should be grouping expressions - // TODO: push down AggregateExpression - missingAttrs.foreach { attr => -if (!a.groupingExpressions.exists(_.semanticEquals(attr))) { - throw new AnalysisException(s"Can't add $attr to ${a.simpleString}") -} - } - val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs - a.copy(aggregateExpressions = newAggregateExpressions) -case g: Generate => - // If join is false, we will convert it to true for getting from the child the missing - // attributes that its child might have or could have. - val missing = missingAttrs -- g.child.ou
[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20094#discussion_r158889013 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -723,7 +726,7 @@ class Analyzer( s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } } - AnalysisBarrier(newRight) + newRight --- End diff -- newRight is introduced before to be wrapped in `AnalysisBarrier`. We can get rid of it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19683#discussion_r158890765 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest { assertEqual( "select * from t lateral view explode(x) expl as x", table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq("x")) +.generate(explode, alias = Some("expl"), outputNames = Seq("x")) .select(star())) // Multiple lateral views +val exploded = table("t") + .generate(explode, alias = Some("expl")) + assertEqual( """select * |from t |lateral view explode(x) expl |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin, - table("t") -.generate(explode, join = true, outer = false, Some("expl"), Seq.empty) -.generate(jsonTuple, join = true, outer = true, Some("jtup"), Seq("q", "z")) + exploded +.generate(jsonTuple, outer = true, alias = Some("jtup"), outputNames = Seq("q", "z")) --- End diff -- and remove ``` val exploded = table("t") .generate(explode, alias = Some("expl")) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org