[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19683
  
**[Test build #85462 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85462/testReport)**
 for PR 19683 at commit 
[`9edd864`](https://github.com/apache/spark/commit/9edd864e6bd3bc1fce0f6c4d2b45620addb82514).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19683
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19683
  
**[Test build #85461 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85461/testReport)**
 for PR 19683 at commit 
[`17db21e`](https://github.com/apache/spark/commit/17db21e69886e761cc3d3a0b8de730a0b6e7ad82).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19683
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85461/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19683: [SPARK-21657][SQL] optimize explode quadratic memory con...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19683
  
**[Test build #85461 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85461/testReport)**
 for PR 19683 at commit 
[`17db21e`](https://github.com/apache/spark/commit/17db21e69886e761cc3d3a0b8de730a0b6e7ad82).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158906785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

will try improve.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread uzadude
Github user uzadude commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158906660
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
@@ -359,12 +359,12 @@ package object dsl {
 
   def generate(
 generator: Generator,
-join: Boolean = false,
+unrequiredChildOutput: Seq[Attribute] = Nil,
--- End diff --

It looks like its a mistake.. I see it was only used in the test suites and 
they all overidden it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20062: [SPARK-22892] [SQL] Simplify some estimation logic by us...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20062
  
**[Test build #85460 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85460/testReport)**
 for PR 20062 at commit 
[`c71d688`](https://github.com/apache/spark/commit/c71d68878fd52c24f9f2ac5fc4e95257368b22b8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20030: [SPARK-10496][CORE] Efficient RDD cumulative sum

2017-12-27 Thread zhengruifeng
Github user zhengruifeng closed the pull request at:

https://github.com/apache/spark/pull/20030


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19977
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85454/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19977
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19977
  
**[Test build #85454 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85454/testReport)**
 for PR 19977 at commit 
[`1c94418`](https://github.com/apache/spark/commit/1c94418c3aa5fe6610914a88b3b2ef3919b56ac4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19527: [SPARK-13030][ML] Create OneHotEncoderEstimator f...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19527#discussion_r158904223
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoderEstimator.scala 
---
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Since
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCols, 
HasOutputCols}
+import org.apache.spark.ml.util._
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types.{DoubleType, NumericType, StructField, 
StructType}
+
+/** Private trait for params and common methods for OneHotEncoderEstimator 
and OneHotEncoderModel */
+private[ml] trait OneHotEncoderBase extends Params with HasHandleInvalid
+with HasInputCols with HasOutputCols {
+
+  /**
+   * Param for how to handle invalid data.
+   * Options are 'keep' (invalid data presented as an extra categorical 
feature) or
+   * 'error' (throw an error).
+   * Default: "error"
+   * @group param
+   */
+  @Since("2.3.0")
+  override val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid",
+"How to handle invalid data " +
+"Options are 'keep' (invalid data presented as an extra categorical 
feature) " +
+"or error (throw an error).",
+
ParamValidators.inArray(OneHotEncoderEstimator.supportedHandleInvalids))
+
+  setDefault(handleInvalid, OneHotEncoderEstimator.ERROR_INVALID)
+
+  /**
+   * Whether to drop the last category in the encoded vector (default: 
true)
+   * @group param
+   */
+  @Since("2.3.0")
+  final val dropLast: BooleanParam =
+new BooleanParam(this, "dropLast", "whether to drop the last category")
+  setDefault(dropLast -> true)
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getDropLast: Boolean = $(dropLast)
+
+  protected def validateAndTransformSchema(
+  schema: StructType, dropLast: Boolean, keepInvalid: Boolean): 
StructType = {
+val inputColNames = $(inputCols)
+val outputColNames = $(outputCols)
+val existingFields = schema.fields
+
+require(inputColNames.length == outputColNames.length,
+  s"The number of input columns ${inputColNames.length} must be the 
same as the number of " +
+s"output columns ${outputColNames.length}.")
+
+// Input columns must be NumericType.
+inputColNames.foreach(SchemaUtils.checkNumericType(schema, _))
+
+// Prepares output columns with proper attributes by examining input 
columns.
+val inputFields = $(inputCols).map(schema(_))
+
+val outputFields = inputFields.zip(outputColNames).map { case 
(inputField, outputColName) =>
+  OneHotEncoderCommon.transformOutputColumnSchema(
+inputField, outputColName, dropLast, keepInvalid)
+}
+outputFields.foldLeft(schema) { case (newSchema, outputField) =>
+  SchemaUtils.appendColumn(newSchema, outputField)
+}
+  }
+}
+
+/**
+ * A one-hot encoder that maps a column of category indices to a column of 
binary vectors, with
+ * at most a single one-value per row that indicates the input category 
index.
+ * For example with 5 categories, an input value of 2.0 would map to an 
output vector of
+ * `[0.0, 0.0, 1.0, 0.0]`.
+ * The last category is not included by default (configurable via 
`dropLast`),
+ * because it makes the vector entries sum up to one, and hence linearly 
dependent.
+ * So an input value of 4.0 maps to `[0.0,

[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158902856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
@@ -85,11 +86,20 @@ case class GenerateExec(
 val numOutputRows = longMetric("numOutputRows")
 child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
   val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
-  val rows = if (join) {
+  val rows = if (requiredChildOutput.nonEmpty) {
+
+val pruneChildForResult: InternalRow => InternalRow =
+  if (unrequiredChildOutput.isEmpty) {
+identity
+  } else {
+UnsafeProjection.create(requiredChildOutput, child.output)
+  }
+
 val joinedRow = new JoinedRow
 iter.flatMap { row =>
+
   // we should always set the left (child output)
--- End diff --

`child output` -> `required child output`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891945
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -73,8 +73,10 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalPlan) extend
  * their output.
  *
  * @param generator the generator expression
- * @param join  when true, each output row is implicitly joined with the 
input tuple that produced
- *  it.
+ * @param unrequiredChildOutput each output row is implicitly joined with 
the relevant part from the
--- End diff --

Ok. But the param doc seems can be improved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158892688
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
@@ -359,12 +359,12 @@ package object dsl {
 
   def generate(
 generator: Generator,
-join: Boolean = false,
+unrequiredChildOutput: Seq[Attribute] = Nil,
--- End diff --

Previously `join` is false by default. This default `unrequiredChildOutput` 
value seems to contradict previous usage. Should we be consistent with before?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2017-12-27 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19222
  
ping @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2017-12-27 Thread zuotingbing
Github user zuotingbing commented on the issue:

https://github.com/apache/spark/pull/20029
  
Could you please to check this PR? Thanks @liufengdb


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20099
  
**[Test build #85459 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85459/testReport)**
 for PR 20099 at commit 
[`a171f29`](https://github.com/apache/spark/commit/a171f295be9daeb3eecd2c031ff55a80b0fbe303).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19977
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85451/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19977
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19977
  
**[Test build #85451 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85451/testReport)**
 for PR 19977 at commit 
[`179c6fd`](https://github.com/apache/spark/commit/179c6fdf261d3392d4d3477a68f7fde60d190435).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20099
  
**[Test build #85458 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85458/testReport)**
 for PR 20099 at commit 
[`2e0007d`](https://github.com/apache/spark/commit/2e0007d1f172ec280788285fcffb02ad362a4a98).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20099
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85458/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20099
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20099
  
**[Test build #85458 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85458/testReport)**
 for PR 20099 at commit 
[`2e0007d`](https://github.com/apache/spark/commit/2e0007d1f172ec280788285fcffb02ad362a4a98).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20096
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20096
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85455/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20096
  
**[Test build #85455 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85455/testReport)**
 for PR 20096 at commit 
[`bcaa694`](https://github.com/apache/spark/commit/bcaa694cd18f5a6df8815882d715a64fda4cc6d9).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85453/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20094
  
**[Test build #85453 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85453/testReport)**
 for PR 20094 at commit 
[`6a25d60`](https://github.com/apache/spark/commit/6a25d60a0e0a24194c1764e217d74d807056039c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20082: [SPARK-22897][CORE]: Expose stageAttemptId in TaskContex...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20082
  
**[Test build #85457 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85457/testReport)**
 for PR 20082 at commit 
[`72a3abf`](https://github.com/apache/spark/commit/72a3abf8110a13c3719b8d6a600edee509b36ae9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158899707
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+totalRowC

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158899698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+totalRowC

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158899621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+totalRowC

[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85450/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20091: [SPARK-22465][FOLLOWUP] Update the number of partitions ...

2017-12-27 Thread mridulm
Github user mridulm commented on the issue:

https://github.com/apache/spark/pull/20091
  
@jiangxb1987  I am not disagreeing with your hypothesis that default 
parallelism might not be optimal in all cases within an application (example - 
when different RDD's in application have widely varying cardinalities).

Since spark.default.parallelism is an exposed interface, which applications 
depend on, changing the semantics here will be a regression in terms of 
functionality and will be breaking an exposed contract in spark.

This is why we have the option of explicitly overriding number of 
partitions when default does not work well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...

2017-12-27 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20082#discussion_r158898080
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable {
*/
   def stageId(): Int
 
+  /**
+   * An ID that is unique to the stage attempt that this task belongs to.
+   */
+  def stageAttemptId(): Int
--- End diff --

Yeah, if we are defining `stageAttemptId` from scratch, I would go for 
`stageAttemptNumber`.  However `stageAttemptId` are already used elsewhere in 
the codebase, Like in 
[Task.scala](https://github.com/apache/spark/blob/ded6d27e4eb02e4530015a95794e6ed0586faaa7/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L55).
 I think it's more important to be consistent.

However I could update the comment to reflect the attempt number part if 
you wish


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20094
  
**[Test build #85450 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85450/testReport)**
 for PR 20094 at commit 
[`cd39760`](https://github.com/apache/spark/commit/cd397605eaf81e3725396c715602734596a83ac3).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...

2017-12-27 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20082#discussion_r158897767
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -42,6 +42,7 @@ import org.apache.spark.util._
  */
 private[spark] class TaskContextImpl(
 val stageId: Int,
+val stageAttemptId: Int,
--- End diff --

OK then.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20082#discussion_r158897401
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -150,6 +150,11 @@ abstract class TaskContext extends Serializable {
*/
   def stageId(): Int
 
+  /**
+   * An ID that is unique to the stage attempt that this task belongs to.
+   */
+  def stageAttemptId(): Int
--- End diff --

I think we should call it `stageAttempNumber` to be consistent with 
`taskAttemptNumber`. Also let's follow the comment of `attemptNumber`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20059: [SPARK-22648][K8s] Add documentation covering ini...

2017-12-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20059


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20082#discussion_r158897297
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -42,6 +42,7 @@ import org.apache.spark.util._
  */
 private[spark] class TaskContextImpl(
 val stageId: Int,
+val stageAttemptId: Int,
--- End diff --

it's kind of a code style standard: add `override` if it is override.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19943
  
**[Test build #85456 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85456/testReport)**
 for PR 19943 at commit 
[`3e1d479`](https://github.com/apache/spark/commit/3e1d479196dfcb21e2d5f641a50c0b663b8247a1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20059: [SPARK-22648][K8s] Add documentation covering init conta...

2017-12-27 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/20059
  
Thanks! merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158897045
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(OrcColumnarBatchReader.DEFAULT_SIZE)
+totalRowCount = reader.getNumberOfRows
+logDebug(s"totalRowCount = $totalRowCount")
+
+this.requiredSchema

[GitHub] spark pull request #19954: [SPARK-22757][Kubernetes] Enable use of remote de...

2017-12-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19954


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85452/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19954: [SPARK-22757][Kubernetes] Enable use of remote dependenc...

2017-12-27 Thread ueshin
Github user ueshin commented on the issue:

https://github.com/apache/spark/pull/19954
  
Thanks! merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20094
  
**[Test build #85452 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85452/testReport)**
 for PR 20094 at commit 
[`8879870`](https://github.com/apache/spark/commit/887987015f6ec35cc0e25648f9f714e4b6bfa982).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20094
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20093: [SPARK-22909][SS]Move Structured Streaming v2 API...

2017-12-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20093


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20093: [SPARK-22909][SS]Move Structured Streaming v2 APIs to st...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20093
  
LGTM, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20036: [SPARK-18016][SQL][FOLLOW-UP] Code Generation: Co...

2017-12-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20036


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20036: [SPARK-18016][SQL][FOLLOW-UP] Code Generation: Constant ...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20036
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20091: [SPARK-22465][FOLLOWUP] Update the number of partitions ...

2017-12-27 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20091
  
The major concern is that `spark.default.parallelism` usually is set a 
relatively small value, so in case the safety-check failed, the value of 
`defaultParallelism` can even be smaller than the number of partitions of the 
existing partitioner, this is the regression case I want to fix in this PR.

A further more issue is that, we should rethink whether we should rely on 
`defaultParallelism` to determine the numPartitions of the default partitioner, 
or the number of partitions should be determined completely dynamistic by the 
upstream RDDs? The current same-as-defaultParallelism way is really prone to 
cause OOM during shuffle stage.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895416
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895321
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895355
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.File
+
+import scala.util.Random
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure ORC read performance.
+ *
+ * This is in `sql/hive` module in order to compare `sql/core` and 
`sql/hive` ORC data sources.
+ */
+// scalastyle:off line.size.limit
+object OrcReadBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("OrcReadBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  private val NATIVE_ORC_FORMAT = 
"org.apache.spark.sql.execution.datasources.orc.OrcFileFormat"
+  private val HIVE_ORC_FORMAT = 
"org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+  private def prepareTable(dir: File, df: DataFrame, partition: 
Option[String] = None): Unit = {
+val dirORC = dir.getCanonicalPath
+
+if (partition.isDefined) {
+  df.write.partitionBy(partition.get).orc(dirORC)
+} else {
+  df.write.orc(dirORC)
+}
+
+
spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable")
+
spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable")
+  }
+
+  def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
+val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column 
Scan", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+spark.range(values).map(_ => 
Random.nextLong).createOrReplaceTempView("t1")
+
+prepareTable(dir, spark.sql(s"SELECT CAST(value as 
${dataType.sql}) id FROM t1"))
+
+sqlBenchmark.addCase("Native ORC") { _ =>
+  spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+}
+
+sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+  spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+SQL Single TINYINT Column Scan:  Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Native ORC 132 /  138
119.4   8.4   1.0X
+Hive built-in ORC 1328 / 1333 
11.8  84.5   0.1X
+
+SQL Single SMALLINT Column Scan: Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Native ORC 178 /  188 
88.2  11.3   1.0X
+

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895273
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895242
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158895185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20096
  
**[Test build #85455 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85455/testReport)**
 for PR 20096 at commit 
[`bcaa694`](https://github.com/apache/spark/commit/bcaa694cd18f5a6df8815882d715a64fda4cc6d9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894878
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #20082: [SPARK-22897][CORE]: Expose stageAttemptId in Tas...

2017-12-27 Thread advancedxy
Github user advancedxy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20082#discussion_r158894808
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContextImpl.scala ---
@@ -42,6 +42,7 @@ import org.apache.spark.util._
  */
 private[spark] class TaskContextImpl(
 val stageId: Int,
+val stageAttemptId: Int,
--- End diff --

Will do.

Would you tell me the difference or rationale?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894770
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
--- End diff --

Since it's `Seq[WritableColumnVector]`, I'll keep the current one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
--- End diff --

Yep. It's renamed to `recordReader`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894581
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894279
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894151
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158894114
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+t

[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19943
  
Thank you for review, @cloud-fan, @viirya, @kiszk, @HyukjinKwon, @henrify.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20100: [SPARK-22913][SQL] Improved Hive Partition Pruning

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20100
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20100: [SPARK-22913][SQL] Improved Hive Partition Prunin...

2017-12-27 Thread ameent
GitHub user ameent opened a pull request:

https://github.com/apache/spark/pull/20100

[SPARK-22913][SQL] Improved Hive Partition Pruning

Adding support for Timestamp and Fractional column types. The pruning
of partitions of these types is being put behind default options
that are set to false, as it's not clear which hive metastore
implementations support predicates on these types of columns.

The AWS Glue Catalog 
http://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html
does support filters on timestamp and fractional columns and pushing these 
filters
down to it has significant performance improvements in our use cases.

As part of this change the hive pruning suite is renamed (a TODO) and 2
ignored tests are added that will validate the functionality of partition
pruning through integration tests. The tests are ignored since the 
integration
test setup uses a Hive client that throws errors when it sees partition 
column
filters on non-integral and non-string columns.

Unit tests are added to validate filtering, which are active.

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-22913

This change addresses the JIRA. I'm looking for feedback on the change 
itself and whether the config values I added make sense. I was not able to find 
official Hive specification on which filters a metastore needs to support and 
as such, feel hesitant to turn on this behavior by default. Piggybacking on top 
of "advancedPartitionPruning" option felt wrong because that config toggles 
whether "in (...)" queries are expanded in a series of "ors" and I don't want 
people to be forced to turn off that behavior alongside not pushing timestamp 
predicates.

## How was this patch tested?

This change is tested via unit tests, modified integration tests (that are 
ignored) and manual tests on EMR 5.10 running against AWS Glue Catalog as the 
Hive metastore.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ameent/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20100.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20100


commit 6b1d5dc8874bba7c707428818123ec63fd7e84f0
Author: Ameen Tayyebi 
Date:   2017-12-28T02:56:13Z

[SPARK-22913][SQL] Improved Hive Partition Pruning

Adding support for Timestamp and Fractional column types. The pruning
of partitions of these types is being put behind default options
that are set to false, as it's not clear which hive metastore
implementations support predicates on these types of columns.

The AWS Glue Catalog 
http://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html
does support filters on timestamp and fractional columns and pushing these 
filters
down to it has significant performance improvements in our use cases.

As part of this change the hive pruning suite is renamed (a TODO) and 2
ignored tests are added that will validate the functionality of partition
pruning through integration tests. The tests are ignored since the 
integration
test setup uses a Hive client that throws errors when it sees partition 
column
filters on non-integral and non-string columns.

Unit tests are added to validate filtering, which are active.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r158893226
  
--- Diff: 
sql/core/src/test/resources/sql-tests/inputs/typeCoercion/native/concat.sql ---
@@ -0,0 +1,93 @@
+-- Concatenate mixed inputs (output type is string)
+SELECT (col1 || col2 || col3) col
+FROM (
+  SELECT
+id col1,
+string(id + 1) col2,
+encode(string(id + 2), 'utf-8') col3
+  FROM range(10)
+);
+
+SELECT ((col1 || col2) || (col3 || col4) || col5) col
+FROM (
+  SELECT
+'prefix_' col1,
+id col2,
+string(id + 1) col3,
+encode(string(id + 2), 'utf-8') col4,
+CAST(id AS DOUBLE) col5
+  FROM range(10)
+);
+
+SELECT ((col1 || col2) || (col3 || col4)) col
--- End diff --

are these 3 cases testing the default value of 
`spark.sql.function.concatBinaryAsString`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2017-12-27 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r158893248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
--- End diff --

Thank you for decision, @cloud-fan . If then, I'll try to update the PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19979: [SPARK-22881][ML][TEST] ML regression package testsuite ...

2017-12-27 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19979
  
Actually, going further than what Bago said: All of the places which use 
globalCheckFunction assume that Dataset.collect() returns the Rows in a fixed 
order.  We should really fix those unit tests to check values row-by-row.  As a 
side effect, that would allow us to eliminate globalCheckFunction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19977
  
**[Test build #85454 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85454/testReport)**
 for PR 19977 at commit 
[`1c94418`](https://github.com/apache/spark/commit/1c94418c3aa5fe6610914a88b3b2ef3919b56ac4).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20099
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20099
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85449/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20099: [SPARK-22916][SQL] shouldn't bias towards build right if...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20099
  
**[Test build #85449 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85449/testReport)**
 for PR 20099 at commit 
[`e4b63f5`](https://github.com/apache/spark/commit/e4b63f5fab81b7637d107efe6524b2f41c681a10).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-27 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r158892653
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -658,6 +660,33 @@ object TypeCoercion {
 }
   }
 
+  /**
+   * Coerces the types of [[Concat]] children to expected ones.
+   *
+   * If `spark.sql.function.concatBinaryAsString` is false and all 
children types are binary,
+   * the expected types are binary. Otherwise, the expected ones are 
strings.
+   */
+  case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule {
+
+private def typeCastToString(c: Concat): Concat = {
+  val newChildren = c.children.map { e =>
+ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e)
+  }
+  c.copy(children = newChildren)
+}
+
+override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = 
plan transform { case p =>
+  p transformExpressionsUp {
+// Skip nodes if unresolved or empty children
+case c @ Concat(children) if !c.childrenResolved || 
children.isEmpty => c
+
+case c @ Concat(children) if conf.concatBinaryAsString ||
+!children.map(_.dataType).forall(_ == BinaryType) =>
+  typeCastToString(c)
--- End diff --

ya, ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19977#discussion_r158892598
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -658,6 +660,33 @@ object TypeCoercion {
 }
   }
 
+  /**
+   * Coerces the types of [[Concat]] children to expected ones.
+   *
+   * If `spark.sql.function.concatBinaryAsString` is false and all 
children types are binary,
+   * the expected types are binary. Otherwise, the expected ones are 
strings.
+   */
+  case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule {
+
+private def typeCastToString(c: Concat): Concat = {
+  val newChildren = c.children.map { e =>
+ImplicitTypeCasts.implicitCast(e, StringType).getOrElse(e)
+  }
+  c.copy(children = newChildren)
+}
+
+override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = 
plan transform { case p =>
+  p transformExpressionsUp {
+// Skip nodes if unresolved or empty children
+case c @ Concat(children) if !c.childrenResolved || 
children.isEmpty => c
+
+case c @ Concat(children) if conf.concatBinaryAsString ||
+!children.map(_.dataType).forall(_ == BinaryType) =>
+  typeCastToString(c)
--- End diff --

we can probably inline this method now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20096
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85448/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20096
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20096: [SPARK-22908] Add kafka source and sink for continuous p...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20096
  
**[Test build #85448 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85448/testReport)**
 for PR 20096 at commit 
[`607b902`](https://github.com/apache/spark/commit/607b9026ff1c208dd3d1dd0052f42da5992289ed).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class ContinuousKafkaSuite extends KafkaSourceTest with 
SharedSQLContext `
  * `class ContinuousKafkaStressSuite extends KafkaSourceTest with 
SharedSQLContext `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20098: [SPARK-22914][DEPLOY] Register history.ui.port

2017-12-27 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/20098
  
CC @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20094
  
**[Test build #85453 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85453/testReport)**
 for PR 20094 at commit 
[`6a25d60`](https://github.com/apache/spark/commit/6a25d60a0e0a24194c1764e217d74d807056039c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20094
  
**[Test build #85452 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85452/testReport)**
 for PR 20094 at commit 
[`8879870`](https://github.com/apache/spark/commit/887987015f6ec35cc0e25648f9f714e4b6bfa982).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20094: [SPARK-20392][SQL][followup] should not add extra Analys...

2017-12-27 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20094
  
LGTM with two minor comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20094#discussion_r158891321
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1079,100 +1083,76 @@ class Analyzer(
   case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
   case sa @ Sort(_, _, child: Aggregate) => sa
 
-  case s @ Sort(order, _, originalChild) if !s.resolved && 
originalChild.resolved =>
-val child = EliminateBarriers(originalChild)
-try {
-  val newOrder = order.map(resolveExpressionRecursively(_, 
child).asInstanceOf[SortOrder])
-  val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
-  val missingAttrs = requiredAttrs -- child.outputSet
-  if (missingAttrs.nonEmpty) {
-// Add missing attributes and then project them away after the 
sort.
-Project(child.output,
-  Sort(newOrder, s.global, addMissingAttr(child, 
missingAttrs)))
-  } else if (newOrder != order) {
-s.copy(order = newOrder)
-  } else {
-s
-  }
-} catch {
-  // Attempting to resolve it might fail. When this happens, 
return the original plan.
-  // Users will see an AnalysisException for resolution failure of 
missing attributes
-  // in Sort
-  case ae: AnalysisException => s
+  case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, 
child)
+val ordering = newOrder.map(_.asInstanceOf[SortOrder])
+if (child.output == newChild.output) {
+  s.copy(order = ordering)
+} else {
+  // Add missing attributes and then project them away.
+  val newSort = s.copy(order = ordering, child = newChild)
+  Project(child.output, newSort)
 }
 
-  case f @ Filter(cond, originalChild) if !f.resolved && 
originalChild.resolved =>
-val child = EliminateBarriers(originalChild)
-try {
-  val newCond = resolveExpressionRecursively(cond, child)
-  val requiredAttrs = newCond.references.filter(_.resolved)
-  val missingAttrs = requiredAttrs -- child.outputSet
-  if (missingAttrs.nonEmpty) {
-// Add missing attributes and then project them away.
-Project(child.output,
-  Filter(newCond, addMissingAttr(child, missingAttrs)))
-  } else if (newCond != cond) {
-f.copy(condition = newCond)
-  } else {
-f
-  }
-} catch {
-  // Attempting to resolve it might fail. When this happens, 
return the original plan.
-  // Users will see an AnalysisException for resolution failure of 
missing attributes
-  case ae: AnalysisException => f
+  case f @ Filter(cond, child) if !f.resolved && child.resolved =>
+val (newCond, newChild) = 
resolveExprsAndAddMissingAttrs(Seq(cond), child)
+if (child.output == newChild.output) {
+  f.copy(condition = newCond.head)
+} else {
+  // Add missing attributes and then project them away.
+  val newFilter = Filter(newCond.head, newChild)
+  Project(child.output, newFilter)
 }
 }
 
-/**
- * Add the missing attributes into projectList of Project/Window or 
aggregateExpressions of
- * Aggregate.
- */
-private def addMissingAttr(plan: LogicalPlan, missingAttrs: 
AttributeSet): LogicalPlan = {
-  if (missingAttrs.isEmpty) {
-return AnalysisBarrier(plan)
-  }
-  plan match {
-case p: Project =>
-  val missing = missingAttrs -- p.child.outputSet
-  Project(p.projectList ++ missingAttrs, addMissingAttr(p.child, 
missing))
-case a: Aggregate =>
-  // all the missing attributes should be grouping expressions
-  // TODO: push down AggregateExpression
-  missingAttrs.foreach { attr =>
-if (!a.groupingExpressions.exists(_.semanticEquals(attr))) {
-  throw new AnalysisException(s"Can't add $attr to 
${a.simpleString}")
-}
-  }
-  val newAggregateExpressions = a.aggregateExpressions ++ 
missingAttrs
-  a.copy(aggregateExpressions = newAggregateExpressions)
-case g: Generate =>
-  // If join is false, we will convert it to true for getting from 
the child the missing
-  // attributes that its child might have or could have.
-  val missing = missingAttrs -- g.child

[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891168
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val N = 6
+
+val spark = sparkSession
+import spark.implicits._
+import org.apache.spark.sql.functions._
+
+val df = sparkSession.sparkContext.parallelize(
--- End diff --

and put the result like other benchmarks in this file, e.g.
```
/*
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6
 Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz
 
 generate stack:  Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
 

 generate stack wholestage off   12953 / 13070  1.3 
772.1   1.0X
  generate stack wholestage on   836 /  847 
20.1  49.8  15.5X
   */
```
You can run this benchmark without your PR and put the result in PR comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158891075
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala
 ---
@@ -227,4 +227,30 @@ class MiscBenchmark extends BenchmarkBase {
 generate stack wholestage on   836 /  847 20.1 
 49.8  15.5X
  */
   }
+
+  ignore("generate explode big struct array") {
+val N = 6
+
+val spark = sparkSession
+import spark.implicits._
+import org.apache.spark.sql.functions._
+
+val df = sparkSession.sparkContext.parallelize(
--- End diff --

please make it a real benchmark
```
val df = Seq("1", Array.fill(N)(i => (i.toString, (i + 1).toString, (i + 
2).toString, (i + 3).toString))).toDF("col", "arr")
runBenchmark("generate big struct array", N) {
  df.withColumn("arr_col", explode('arr)).select("col", "arr_col.*").count
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19813: [SPARK-22600][SQL] Fix 64kb limit for deeply nested expr...

2017-12-27 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19813
  
LGTM, great work!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20094#discussion_r158890342
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -1079,100 +1083,76 @@ class Analyzer(
   case sa @ Sort(_, _, AnalysisBarrier(child: Aggregate)) => sa
   case sa @ Sort(_, _, child: Aggregate) => sa
 
-  case s @ Sort(order, _, originalChild) if !s.resolved && 
originalChild.resolved =>
-val child = EliminateBarriers(originalChild)
-try {
-  val newOrder = order.map(resolveExpressionRecursively(_, 
child).asInstanceOf[SortOrder])
-  val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
-  val missingAttrs = requiredAttrs -- child.outputSet
-  if (missingAttrs.nonEmpty) {
-// Add missing attributes and then project them away after the 
sort.
-Project(child.output,
-  Sort(newOrder, s.global, addMissingAttr(child, 
missingAttrs)))
-  } else if (newOrder != order) {
-s.copy(order = newOrder)
-  } else {
-s
-  }
-} catch {
-  // Attempting to resolve it might fail. When this happens, 
return the original plan.
-  // Users will see an AnalysisException for resolution failure of 
missing attributes
-  // in Sort
-  case ae: AnalysisException => s
+  case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
+val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, 
child)
+val ordering = newOrder.map(_.asInstanceOf[SortOrder])
+if (child.output == newChild.output) {
+  s.copy(order = ordering)
+} else {
+  // Add missing attributes and then project them away.
+  val newSort = s.copy(order = ordering, child = newChild)
+  Project(child.output, newSort)
 }
 
-  case f @ Filter(cond, originalChild) if !f.resolved && 
originalChild.resolved =>
-val child = EliminateBarriers(originalChild)
-try {
-  val newCond = resolveExpressionRecursively(cond, child)
-  val requiredAttrs = newCond.references.filter(_.resolved)
-  val missingAttrs = requiredAttrs -- child.outputSet
-  if (missingAttrs.nonEmpty) {
-// Add missing attributes and then project them away.
-Project(child.output,
-  Filter(newCond, addMissingAttr(child, missingAttrs)))
-  } else if (newCond != cond) {
-f.copy(condition = newCond)
-  } else {
-f
-  }
-} catch {
-  // Attempting to resolve it might fail. When this happens, 
return the original plan.
-  // Users will see an AnalysisException for resolution failure of 
missing attributes
-  case ae: AnalysisException => f
+  case f @ Filter(cond, child) if !f.resolved && child.resolved =>
+val (newCond, newChild) = 
resolveExprsAndAddMissingAttrs(Seq(cond), child)
+if (child.output == newChild.output) {
+  f.copy(condition = newCond.head)
+} else {
+  // Add missing attributes and then project them away.
+  val newFilter = Filter(newCond.head, newChild)
+  Project(child.output, newFilter)
 }
 }
 
-/**
- * Add the missing attributes into projectList of Project/Window or 
aggregateExpressions of
- * Aggregate.
- */
-private def addMissingAttr(plan: LogicalPlan, missingAttrs: 
AttributeSet): LogicalPlan = {
-  if (missingAttrs.isEmpty) {
-return AnalysisBarrier(plan)
-  }
-  plan match {
-case p: Project =>
-  val missing = missingAttrs -- p.child.outputSet
-  Project(p.projectList ++ missingAttrs, addMissingAttr(p.child, 
missing))
-case a: Aggregate =>
-  // all the missing attributes should be grouping expressions
-  // TODO: push down AggregateExpression
-  missingAttrs.foreach { attr =>
-if (!a.groupingExpressions.exists(_.semanticEquals(attr))) {
-  throw new AnalysisException(s"Can't add $attr to 
${a.simpleString}")
-}
-  }
-  val newAggregateExpressions = a.aggregateExpressions ++ 
missingAttrs
-  a.copy(aggregateExpressions = newAggregateExpressions)
-case g: Generate =>
-  // If join is false, we will convert it to true for getting from 
the child the missing
-  // attributes that its child might have or could have.
-  val missing = missingAttrs -- g.child.ou

[GitHub] spark pull request #20094: [SPARK-20392][SQL][followup] should not add extra...

2017-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20094#discussion_r158889013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -723,7 +726,7 @@ class Analyzer(
 s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, 
attributeRewrites))
 }
   }
-  AnalysisBarrier(newRight)
+  newRight
--- End diff --

newRight is introduced before to be wrapped in `AnalysisBarrier`.  We can 
get rid of it now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19683: [SPARK-21657][SQL] optimize explode quadratic mem...

2017-12-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19683#discussion_r158890765
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -276,22 +276,24 @@ class PlanParserSuite extends AnalysisTest {
 assertEqual(
   "select * from t lateral view explode(x) expl as x",
   table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq("x"))
+.generate(explode, alias = Some("expl"), outputNames = Seq("x"))
 .select(star()))
 
 // Multiple lateral views
+val exploded = table("t")
+  .generate(explode, alias = Some("expl"))
+
 assertEqual(
   """select *
 |from t
 |lateral view explode(x) expl
 |lateral view outer json_tuple(x, y) jtup q, z""".stripMargin,
-  table("t")
-.generate(explode, join = true, outer = false, Some("expl"), 
Seq.empty)
-.generate(jsonTuple, join = true, outer = true, Some("jtup"), 
Seq("q", "z"))
+  exploded
+.generate(jsonTuple, outer = true, alias = Some("jtup"), 
outputNames = Seq("q", "z"))
--- End diff --

and remove
```
val exploded = table("t")
  .generate(explode, alias = Some("expl"))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >