[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190370842
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
+val underlyingIt = map.readingIterator
+assert( underlyingIt != null )
+val underlyingMapIterator = underlyingIt.upstream
+assert(underlyingMapIterator != null)
+val underlyingMapIteratorClass = underlyingMapIterator.getClass
+assert(underlyingMapIteratorClass.getEnclosingClass == 
classOf[AppendOnlyMap[_, _]])
+
+val underlyingMap = map.currentMap
+assert(underlyingMap != null)
+
+val first50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert( map.numSpills == 0 )
+map.spill(Long.MaxValue, null)
+// these asserts try to show that we're no longer holding references 
to the underlying map.
+// it'd be nice to use something like
+// 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+// (lines 69-89)
+assert(map.currentMap == null)
+assert(underlyingIt.upstream ne underlyingMapIterator)
+assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass)
+assert(underlyingIt.upstream.getClass.getEnclosingClass != 
classOf[AppendOnlyMap[_, _]])
+
+val next50Keys = for ( _ <- 0 until 50) yield {
+  val (k, vs) = it.next
+  val sortedVs = vs.sorted
+  assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+  k
+}
+assert(!it.hasNext)
+val keys = (first50Keys ++ next50Keys).sorted
+assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
--- End diff --

:+1: 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190371425
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
 true
   }
 }
 
+private def destroy() : Unit = {
+  freeCurrentMap()
+  upstream = Iterator.empty
+}
+
+private[ExternalAppendOnlyMap]
--- End diff --

hmm... the class itself is private (slightly relaxed to package private to 
ease testing) so I'm not sure what's the benefit in making the method public,
in any case I think that once we see the use case for making this method 
public we'd probably has to further change the iterator/external map classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21415
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21390
  
**[Test build #91064 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91064/testReport)**
 for PR 21390 at commit 
[`0df8e4e`](https://github.com/apache/spark/commit/0df8e4ec71971468854b6d778a1899df8df71211).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21369
  
**[Test build #91065 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91065/testReport)**
 for PR 21369 at commit 
[`e3c61fd`](https://github.com/apache/spark/commit/e3c61fd596cdb6eb6b54a6eee0004ca79cb553af).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21311
  
**[Test build #91052 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91052/testReport)**
 for PR 21311 at commit 
[`b8b6324`](https://github.com/apache/spark/commit/b8b632450d824d7abf092c10f8e94f6938be1104).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21411
  
cc @michal-databricks 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91068/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190403327
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

Why did this change from WAITING to TIMED_WAITING. Can the thread be one or 
the other state? would this cause flakiness?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-05-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r190405223
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala ---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.status.api.v1.PeakMemoryMetrics
+
+/**
+ * Records the peak values for executor level metrics. If 
jvmUsedHeapMemory is -1, then no
+ * values have been recorded yet.
+ */
+private[spark] class PeakExecutorMetrics {
+  private var _jvmUsedHeapMemory = -1L;
+  private var _jvmUsedNonHeapMemory = 0L;
+  private var _onHeapExecutionMemory = 0L
+  private var _offHeapExecutionMemory = 0L
+  private var _onHeapStorageMemory = 0L
+  private var _offHeapStorageMemory = 0L
+  private var _onHeapUnifiedMemory = 0L
+  private var _offHeapUnifiedMemory = 0L
+  private var _directMemory = 0L
+  private var _mappedMemory = 0L
+
+  def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory
+
+  def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory
+
+  def onHeapExecutionMemory: Long = _onHeapExecutionMemory
+
+  def offHeapExecutionMemory: Long = _offHeapExecutionMemory
+
+  def onHeapStorageMemory: Long = _onHeapStorageMemory
+
+  def offHeapStorageMemory: Long = _offHeapStorageMemory
+
+  def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory
+
+  def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory
+
+  def directMemory: Long = _directMemory
+
+  def mappedMemory: Long = _mappedMemory
+
+  /**
+   * Compare the specified memory values with the saved peak executor 
memory
+   * values, and update if there is a new peak value.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = {
+var updated: Boolean = false
+
+if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) {
+  _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory
+  updated = true
+}
+if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) {
+  _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory
+  updated = true
+}
+if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) {
+  _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) {
+  _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory
+  updated = true
+}
+if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) {
+  _onHeapStorageMemory = executorMetrics.onHeapStorageMemory
+  updated = true
+}
+if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) {
+  _offHeapStorageMemory = executorMetrics.offHeapStorageMemory
--- End diff --

I know spark has this kind of code all over the place already, but I really 
hate how error prone it is -- way too easy for a copy paste error to result in 
comparing the wrong two metrics, or updating the wrong value, or forgetting to 
update this when another metric is added, etc.

I just opened this https://github.com/edwinalu/spark/pull/1 as another way 
to do this that would eliminate a ton of boilerplate IMO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r190399084
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   spark.read.option("mode", "PERMISSIVE").option("encoding", 
"UTF-8").json(Seq(badJson).toDS()),
   Row(badJson))
   }
+
+  test("SPARK-23772 ignore column of all null values or empty array during 
schema inference") {
+ withTempPath { tempDir =>
+  val path = tempDir.getAbsolutePath
+  Seq(
+"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], 
"e":{}}""",
+"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""",
+"""{"a":null, "b":[], "c":[], "d": null, "e":null}""")
+.toDS().write.mode("overwrite").text(path)
--- End diff --

Do you need the `overwrite` mode?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r190401748
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   spark.read.option("mode", "PERMISSIVE").option("encoding", 
"UTF-8").json(Seq(badJson).toDS()),
   Row(badJson))
   }
+
+  test("SPARK-23772 ignore column of all null values or empty array during 
schema inference") {
+ withTempPath { tempDir =>
+  val path = tempDir.getAbsolutePath
+  Seq(
+"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], 
"e":{}}""",
+"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""",
+"""{"a":null, "b":[], "c":[], "d": null, "e":null}""")
--- End diff --

Could you add a test when `dropFieldIfAllNull` is set to `true` but not all 
values in a column are `null`s


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21372
  
Yep. Both JIRA and PR description is updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21372
  
Retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21372
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21415
  
**[Test build #91061 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91061/testReport)**
 for PR 21415 at commit 
[`0aef16b`](https://github.com/apache/spark/commit/0aef16b5e9017fb398e0df2f3694a1db1f4d7cb8).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21394: [SPARK-24329][SQL] Test for skipping multi-space lines

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21394
  
**[Test build #91063 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91063/testReport)**
 for PR 21394 at commit 
[`a2cdc29`](https://github.com/apache/spark/commit/a2cdc2916d70c7031f41b1b9795af0e4b50cc977).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21410: [SPARK-24366][SQL] Improving of error messages for type ...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21410
  
**[Test build #91062 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91062/testReport)**
 for PR 21410 at commit 
[`26cc2f8`](https://github.com/apache/spark/commit/26cc2f84ee6324db23936e20816d240031211311).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21390
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21390
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21311
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91052/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21390
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3521/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21311
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21369
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3522/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/21404
  
I'm investigating timing issue here. Spark loads Hive Metastore class 
lazily. Here, Spark is trying to access Hive metastore tables like `DBS` before 
it's created. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21311
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21372
  
Please document the description of the bug in both JIRA and PR description? 
Also need to mention which ORC reader is affected.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21416
  
**[Test build #91068 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91068/testReport)**
 for PR 21416 at commit 
[`ec91220`](https://github.com/apache/spark/commit/ec91220d5e8658500be6d049f8ab3496fc8a914e).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190399106
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---
@@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("isinSet: Scala Set") {
+val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b")
+checkAnswer(df.filter($"a".isinSet(Set(1, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 2))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, 1))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+// Auto casting should work with mixture of different types in Set
+checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2))
+checkAnswer(df.filter($"a".isinSet(Set(3, "1"))),
+  df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1))
+
+checkAnswer(df.filter($"b".isinSet(Set("y", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "y" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "x"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "x"))
+checkAnswer(df.filter($"b".isinSet(Set("z", "y"))),
+  df.collect().toSeq.filter(r => r.getString(1) == "z" || 
r.getString(1) == "y"))
+
+val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b")
+
+intercept[AnalysisException] {
+  df2.filter($"a".isinSet(Set($"b")))
+}
--- End diff --

Let's check the error message to prevent the future regression like raising 
different AnalysisException.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190401100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
--- End diff --

super nit: I personally find these sort of escape hard to read in code. 
Consider making a separate flag to end the while loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190400926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
--- End diff --

move this inside... its hard to follow the pattern `x = // some large code 
structured`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190403472
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
   }
 
   test("blocks waiting for new rows") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, checkpointIntervalMs = 
Long.MaxValue)
+val epoch = rdd.compute(rdd.partitions(0), ctx)
 
 val readRowThread = new Thread {
   override def run(): Unit = {
-// set the non-inheritable thread local
-TaskContext.setTaskContext(ctx)
-val epoch = rdd.compute(rdd.partitions(0), ctx)
-epoch.next().getInt(0)
+try {
+  epoch.next().getInt(0)
+} catch {
+  case _: InterruptedException => // do nothing - expected at test 
ending
+}
   }
 }
 
 try {
   readRowThread.start()
   eventually(timeout(streamingTimeout)) {
-assert(readRowThread.getState == Thread.State.WAITING)
+assert(readRowThread.getState == Thread.State.TIMED_WAITING)
   }
 } finally {
   readRowThread.interrupt()
   readRowThread.join()
 }
   }
+
+  test("multiple writers") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 
1, numShuffleWriters = 3)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+send(
+  endpoint,
+  ReceiverRow(0, unsafeRow("writer0-row0")),
+  ReceiverRow(1, unsafeRow("writer1-row0")),
+  ReceiverRow(2, unsafeRow("writer2-row0")),
+  ReceiverEpochMarker(0),
+  ReceiverEpochMarker(1),
+  ReceiverEpochMarker(2)
+)
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet ==
+  Set("writer0-row0", "writer1-row0", "writer2-row0"))
+  }
+
+  test("epoch only ends when all writers send markers") {
+val rdd = new ContinuousShuffleReadRDD(
+  sparkContext, numPartitions = 1, numShuffleWriters = 3, 
checkpointIntervalMs = Long.MaxValue)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+send(
+  endpoint,
+  ReceiverRow(0, unsafeRow("writer0-row0")),
+  ReceiverRow(1, unsafeRow("writer1-row0")),
+  ReceiverRow(2, unsafeRow("writer2-row0")),
+  ReceiverEpochMarker(0),
+  ReceiverEpochMarker(2)
+)
+
+val epoch = rdd.compute(rdd.partitions(0), ctx)
+val rows = (0 until 3).map(_ => epoch.next()).toSet
+assert(rows.map(_.getUTF8String(0).toString) ==
+  Set("writer0-row0", "writer1-row0", "writer2-row0"))
+
+// After checking the right rows, block until we get an epoch marker 
indicating there's no next.
+// (Also fail the assertion if for some reason we get a row.)
+val readEpochMarkerThread = new Thread {
+  override def run(): Unit = {
+assert(!epoch.hasNext)
+  }
+}
+
+readEpochMarkerThread.start()
+eventually(timeout(streamingTimeout)) {
+  assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING)
--- End diff --

Same question as above ... is this the only possible thread state.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21372
  
**[Test build #91055 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91055/testReport)**
 for PR 21372 at commit 
[`954d1d9`](https://github.com/apache/spark/commit/954d1d92ade183d8774b75e03cb02e16635cde48).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21372
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21372
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91055/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21372
  
**[Test build #91070 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91070/testReport)**
 for PR 21372 at commit 
[`954d1d9`](https://github.com/apache/spark/commit/954d1d92ade183d8774b75e03cb02e16635cde48).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21410: [SPARK-24366][SQL] Improving of error messages for type ...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on the issue:

https://github.com/apache/spark/pull/21410
  
jenkins, retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190375595
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 ---
@@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C](
   } else {
 logInfo(s"Task ${context.taskAttemptId} force spilling in-memory 
map to disk and " +
   s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-nextUpstream = spillMemoryIteratorToDisk(upstream)
+val nextUpstream = spillMemoryIteratorToDisk(upstream)
+assert(!upstream.hasNext)
 hasSpilled = true
+upstream = nextUpstream
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(currentMap I think), so unless we've missed another ref we should be fine.

as I wrote above, I think there's a potentially more fundamental issue with 
`CompletionIterator` which keeps holding references via it's `sub` and 
`completionFunction` members , these might stall some objects from being 
collected and can be eliminated upon exhaustion of the iterator. there might be 
some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think 
this desrves some more investigation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...

2018-05-23 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21295#discussion_r190377736
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 ---
@@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
   }
 }
   }
+
+  test("SPARK-24230: filter row group using dictionary") {
+withSQLConf(("parquet.filter.dictionary.enabled", "true")) {
--- End diff --

Looks like the problem is a bug in Parquet. It is using the [stats property 
instead of the dictionary 
property](https://github.com/apache/parquet-mr/blob/8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java#L83).
 This is minor because there is almost no reason to turn either one off now 
that we've built more confidence in the filters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase w...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21295
  
Thanks for your investigation! 

Also, congratulations! 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21266: [SPARK-24206][SQL] Improve DataSource read benchm...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21266


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21379: [SPARK-24327][SQL] Add an option to quote a parti...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21379#discussion_r190382338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
 ---
@@ -78,7 +79,12 @@ private[sql] object JDBCRelation extends Logging {
 // Overflow and silliness can happen if you subtract then divide.
 // Here we get a little roundoff, but that's (hopefully) OK.
 val stride: Long = upperBound / numPartitions - lowerBound / 
numPartitions
-val column = partitioning.column
+val column = if (jdbcOptions.quotePartitionColumnName) {
+  val dialect = JdbcDialects.get(jdbcOptions.url)
+  dialect.quoteIdentifier(partitioning.column)
--- End diff --

Yeah


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21342
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21372#discussion_r190383386
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -169,6 +170,14 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
   }
 }
   }
+
+  test("SPARK-24322 Fix incorrect workaround for bug in 
java.sql.Timestamp") {
+withTempPath { path =>
+  val ts = Timestamp.valueOf("1900-05-05 12:34:56.000789")
+  Seq(ts).toDF.write.orc(path.getCanonicalPath)
+  checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts))
+}
+  }
--- End diff --

`OrcSourceSuite` is dedicated for `native` Orc Reader . For `hive` ORC 
reader, `HiveOrcSourceSuite`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21342


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21404
  
@wangyum I do not think we should deprecate the support of the previous 
versions of Hive metastore. Many Spark users are still using them. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21399: [SPARK-22269][BUILD] Run Java linter via SBT for Jenkins

2018-05-23 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/21399
  
Checked locally and looks good. Feel free to merge if you don't want to 
address the comment above in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21404
  
@dongjoon-hyun Thanks for your investigation!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190389782
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3524/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190402783
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
--- End diff --

Actually ... finished is the flag, just use that for the while loop, that 
is, `while (!finished && nextRow == null)`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21346
  
**[Test build #91056 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91056/testReport)**
 for PR 21346 at commit 
[`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...

2018-05-23 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21319
  
@cloud-fan, what about adding support for v2 pushdown in the stats visitor 
instead?

Here's the idea: when the visitor hits a `Filter` or a `Project`, it tries 
to match the plan using `PhysicalOperation`. If that works and the underlying 
relation is a `DataSourceV2Relation`, it does the pushdown to configure a 
reader and return stats from it.

That would give us the ability to do pushdown on conversion to physical 
plan, but we'd get correct stats. The only drawback is that we would 
temporarily make the stats code a bit larger, but at least it is separate so we 
can get the logical plans right and fix stats on a separate schedule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...

2018-05-23 Thread MaxGekk
GitHub user MaxGekk opened a pull request:

https://github.com/apache/spark/pull/21415

[SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV 
parser

## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes 
for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them 
into the CSV parser. Benchmarks show the following improvements in parsing of 
1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return 
different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes 
if only subset of all columns is requested. To have previous behavior, set 
`spark.sql.csv.parser.columnPruning.enabled` to `false`.
 
## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing 
tests and by new benchmarks.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/MaxGekk/spark-1 csv-column-pruning2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21415


commit 9cffa0fccc33552e8fce3580a9a665b022f5bf22
Author: Maxim Gekk 
Date:   2018-03-21T20:03:11Z

Adding tests for select only requested columns

commit fdbcbe3536aee04e6a84b72ac319726614416bc3
Author: Maxim Gekk 
Date:   2018-03-21T20:42:08Z

Select indexes of required columns only

commit 578f47b0f32a76caf6c9ede8763c9cf85a1c83e9
Author: Maxim Gekk 
Date:   2018-03-24T10:41:29Z

Fix the case when number of parsed fields are not matched to required schema

commit 0f942c308dca173dad8f421e893066b8c03d35a3
Author: Maxim Gekk 
Date:   2018-03-24T11:07:55Z

Using selectIndexes if required number of columns are less than its total 
number.

commit c4b11601e9c264729e141fff3dc653d868a7ad69
Author: Maxim Gekk 
Date:   2018-03-24T11:48:43Z

Fix the test: force to read all columns

commit 8cf6eab952d79628cb8ee2ff7b92dadae60ec686
Author: Maxim Gekk 
Date:   2018-04-06T20:55:35Z

Fix merging conflicts

commit 5b2f0b9d7346f927842bc1a2089a7299876f1894
Author: Maxim Gekk 
Date:   2018-04-29T11:52:08Z

Benchmarks for many columns

commit 6d1e902c0011e88dbafb65c4ad6e7431370ed12d
Author: Maxim Gekk 
Date:   2018-04-29T12:59:58Z

Make size of requiredSchema equals to amount of selected columns

commit 4525795f7337cbd081f569cd79d7f90cb58edbee
Author: Maxim Gekk 
Date:   2018-04-29T13:36:54Z

Removing selection of all columns

commit 8809cecf93d8e7a97eca827d9e8637a7eb5b2449
Author: Maxim Gekk 
Date:   2018-04-29T13:50:44Z

Updating benchmarks for select indexes

commit dc97ceb96185ed2eaa05fbe1aee8ecfe8ccb7e7d
Author: Maxim Gekk 
Date:   2018-05-05T19:19:17Z

Addressing Herman's review comments

commit 51b31483263e13cd85b19b3efea65188945eda99
Author: Maxim Gekk 
Date:   2018-05-10T18:39:38Z

Updated benchmark result for recent changes

commit e3958b1468b490b548574b53512f0d83850e6f6f
Author: Maxim Gekk 
Date:   2018-05-10T18:46:17Z

Add ticket number to test title

commit a4a0a549156a15011c33c7877a35f244d75b7a4f
Author: Maxim Gekk 
Date:   2018-05-10T19:02:24Z

Removing unnecessary benchmark

commit fa860157c982846524bd8f151daf8a2154117b34
Author: Maxim Gekk 
Date:   2018-05-13T18:49:49Z

Updating the migration guide

commit 15528d20a74904c14c58bf3ad54c9a552c519430
Author: Maxim Gekk 
Date:   2018-05-13T18:55:06Z

Moving some values back as it was.

commit f90daa7ea33d119be978c27de10978c2d6281e25
Author: Maxim Gekk 
Date:   2018-05-13T18:58:20Z

Renaming the test title

commit 4d9873d39277b9cbaee892957c06bfc2cb9a52f1
Author: Maxim Gekk 
Date:   2018-05-17T20:02:47Z

Improving of the migration guide

commit 7dcfc7a7664fcd5311cb352f0ea7a24b3cc1c639
Author: Maxim Gekk 
Date:   2018-05-17T20:12:49Z

Merge remote-tracking branch 'origin/master' into csv-column-pruning

# Conflicts:
#   
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala
#   

[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-23 Thread JoshRosen
Github user JoshRosen commented on the issue:

https://github.com/apache/spark/pull/21342
  
Updated changes LGTM. Thanks for working on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21266
  
**[Test build #91049 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91049/testReport)**
 for PR 21266 at commit 
[`5eab1a5`](https://github.com/apache/spark/commit/5eab1a51badca648c52287f7eb88dcdb481c5c3b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...

2018-05-23 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21295#discussion_r190378887
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 ---
@@ -225,7 +226,8 @@ protected void initialize(String path, List 
columns) throws IOException
 this.sparkSchema = new 
ParquetToSparkSchemaConverter(config).convert(requestedSchema);
 this.reader = new ParquetFileReader(
 config, footer.getFileMetaData(), file, blocks, 
requestedSchema.getColumns());
-for (BlockMetaData block : blocks) {
+// use the blocks from the reader in case some do not match filters 
and will not be read
+for (BlockMetaData block : reader.getRowGroups()) {
--- End diff --

Dictionary filtering is off by default in 1.8.x. It was enabled after we 
built confidence in its correctness in 1.9.x.

We should backport this fix to 2.3.x also, but the only downside to not 
having it is that dictionary filtering will throw an exception when it is 
enabled. So the feature just isn't available.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21288#discussion_r190382044
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 ---
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure read performance with Filter pushdown.
+ * To run this:
+ *  spark-submit --class  
+ */
+object FilterPushdownBenchmark {
+  val conf = new SparkConf()
+.setAppName("FilterPushdownBenchmark")
+.setIfMissing("spark.master", "local[1]")
+.setIfMissing("spark.driver.memory", "3g")
+.setIfMissing("spark.executor.memory", "3g")
+.setIfMissing("orc.compression", "snappy")
+.setIfMissing("spark.sql.parquet.compression.codec", "snappy")
+
+  private val spark = SparkSession.builder().config(conf).getOrCreate()
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private def prepareTable(
+  dir: File, numRows: Int, width: Int, useStringForValue: Boolean): 
Unit = {
+import spark.implicits._
+val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i")
+val valueCol = if (useStringForValue) {
+  monotonically_increasing_id().cast("string")
+} else {
+  monotonically_increasing_id()
+}
+val df = spark.range(numRows).map(_ => 
Random.nextLong).selectExpr(selectExpr: _*)
+  .withColumn("value", valueCol)
+  .sort("value")
+
+saveAsOrcTable(df, dir.getCanonicalPath + "/orc")
+saveAsParquetTable(df, dir.getCanonicalPath + "/parquet")
+  }
+
+  private def prepareStringDictTable(
+  dir: File, numRows: Int, numDistinctValues: Int, width: Int): Unit = 
{
+val selectExpr = (0 to width).map {
+  case 0 => s"CAST(id % $numDistinctValues AS STRING) AS value"
+  case i => s"CAST(rand() AS STRING) c$i"
+}
+val df = spark.range(numRows).selectExpr(selectExpr: _*).sort("value")
+
+saveAsOrcTable(df, dir.getCanonicalPath + "/orc")
+saveAsParquetTable(df, dir.getCanonicalPath + "/parquet")
+  }
+
+  private def saveAsOrcTable(df: DataFrame, dir: String): Unit = {
+df.write.mode("overwrite").orc(dir)
+spark.read.orc(dir).createOrReplaceTempView("orcTable")
+  }
+
+  private def saveAsParquetTable(df: DataFrame, dir: String): Unit = {
+df.write.mode("overwrite").parquet(dir)
+spark.read.parquet(dir).createOrReplaceTempView("parquetTable")
+  }
+
+  def filterPushDownBenchmark(
+  values: Int,
+  title: String,
+  whereExpr: String,
+  selectExpr: String = "*"): Unit = {
+val benchmark = new Benchmark(title, values, minNumIters = 5)
+
+Seq(false, true).foreach { pushDownEnabled =>
+  val 

[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21372
  
Before we do the merge, could you address the comment: 
https://github.com/apache/spark/pull/21372#discussion_r190073105?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21369
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21399: [SPARK-22269][BUILD] Run Java linter via SBT for ...

2018-05-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21399#discussion_r190383825
  
--- Diff: dev/run-tests.py ---
@@ -574,8 +574,7 @@ def main():
 or f.endswith("checkstyle.xml")
 or 
f.endswith("checkstyle-suppressions.xml")
 for f in changed_files):
-# run_java_style_checks()
-pass
+run_java_style_checks()
--- End diff --

This is really minor, but now you'll be running this when building with 
maven too. That could be avoided by checking that the build is not using maven; 
and also changing the maven target from `package` to `verify` so that these 
checks run.

You can add `[build-maven]` to the PR title to try it out (or locally with 
`AMPLAB_JENKINS_BUILD_TOOL=maven`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21311
  
**[Test build #91066 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91066/testReport)**
 for PR 21311 at commit 
[`b8b6324`](https://github.com/apache/spark/commit/b8b632450d824d7abf092c10f8e94f6938be1104).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190386369
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21411
  
**[Test build #91067 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91067/testReport)**
 for PR 21411 at commit 
[`3a6a87b`](https://github.com/apache/spark/commit/3a6a87ba0e0bcb36a7a023edbd35fe411ed2fd6d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21411
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3523/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21411
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190396813
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -42,16 +47,24 @@ case class ContinuousShuffleReadPartition(index: Int, 
queueSize: Int) extends Pa
  * RDD at the map side of each continuous processing shuffle task. 
Upstream tasks send their
  * shuffle output to the wrapped receivers in partitions of this RDD; each 
of the RDD's tasks
  * poll from their receiver until an epoch marker is sent.
+ *
+ * @param sc the RDD context
+ * @param numPartitions the number of read partitions for this RDD
+ * @param queueSize the size of the row buffers to use
+ * @param numShuffleWriters the number of continuous shuffle writers 
feeding into this RDD
+ * @param checkpointIntervalMs the checkpoint interval of the streaming 
query
  */
 class ContinuousShuffleReadRDD(
 sc: SparkContext,
 numPartitions: Int,
-queueSize: Int = 1024)
+queueSize: Int = 1024,
+numShuffleWriters: Int = 1,
+checkpointIntervalMs: Long = 1000)
--- End diff --

Isnt this the same as epochInterval? the term "epoch" is more well known in 
the code than "checkpoint"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21416
  
**[Test build #91069 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91069/testReport)**
 for PR 21416 at commit 
[`6459015`](https://github.com/apache/spark/commit/6459015fd923371b1515177db477afe1ce2813aa).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21385
  
**[Test build #91054 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91054/testReport)**
 for PR 21385 at commit 
[`e02d714`](https://github.com/apache/spark/commit/e02d714f6c6774eb275bf86cc81e09dd80f40b11).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21414
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21414
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91057/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21346
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91056/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21346
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21266
  
LGTM

Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190385827
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

We always add to the top level and then in the lower level poms, we 
reference the dependent modules without listing their versions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dbtsai
GitHub user dbtsai opened a pull request:

https://github.com/apache/spark/pull/21416

[SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java.

## What changes were proposed in this pull request?

Implemented **`isinSet`** in DataFrame API for both Scala and Java, so 
users can do

```scala
val profileDF = Seq(
  Some(1), Some(2), Some(3), Some(4),
  Some(5), Some(6), Some(7), None
).toDF("profileID")

val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3")

val result = profileDF.withColumn("isValid", 
$"profileID".isinSet(validUsers))

result.show(10)
"""
+-+---+
|profileID|isValid|
+-+---+
|1|  false|
|2|  false|
|3|   true|
|4|  false|
|5|  false|
|6|   true|
|7|   true|
| null|   null|
+-+---+
 """.stripMargin
```

Two new rules in the logical plan optimizers are added.
 
1. When there is only one element in the **`Set`**, the physical plan will 
be optimized to **`EqualTo`**, so predicate pushdown can be used. 

```scala
profileDF.filter( $"profileID".isinSet(Set(6))).explain(true)
"""
  |== Physical Plan ==
  |*(1) Project [profileID#0]
  |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6))
  |   +- *(1) FileScan parquet [profileID#0] Batched: true, Format: 
Parquet, 
  | PartitionFilters: [], 
  | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)],
  | ReadSchema: struct
""".stripMargin
```

2. When the **`Set`** is empty, and the input is nullable, the logical plan 
will be simplified to 

```scala
profileDF.filter( $"profileID".isinSet(Set())).explain(true)
"""
  |== Optimized Logical Plan ==
  |Filter if (isnull(profileID#0)) null else false
  |+- Relation[profileID#0] parquet
""".stripMargin
```

TODO:

1. For multiple conditions with numbers less than certain thresholds, we 
should still allow predicate pushdown.
2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when 
the numbers of the categories are low, and they are **`Int`**, **`Long`**.
3. The default immutable hash trees set is slow for query, and we should do 
benchmark for using different set implementation for faster query.
4. **`filter(if (condition) null else false)`** can be optimized to false.

## How was this patch tested?

Several unit tests are added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbtsai/spark optimize-set

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21416


commit ec91220d5e8658500be6d049f8ab3496fc8a914e
Author: DB Tsai 
Date:   2018-05-17T00:21:14Z

Added isinSet in DataFrame API for Scala and Java.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190398282
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21416
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3525/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21385
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91054/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21385
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190402584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
+  finished = true
+  // Break out of the while loop and end the iterator.
+  return null
+} else {
+  // Poll again for the next completion result.
+  null
+}
--- End diff --

if you put `nextRow = newReceivedRow` inside the case ReceiverRow, then 
this else clause is not needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190372105
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on the issue:

https://github.com/apache/spark/pull/21415
  
The difference between this PR and #21296 is that the `columnPruning` is 
passed to CSVOptions as a parameter. It should fix flaky `UnivocityParserSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...

2018-05-23 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21295#discussion_r190379077
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 ---
@@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptCont
 this.sparkSchema = 
StructType$.MODULE$.fromString(sparkRequestedSchemaString);
 this.reader = new ParquetFileReader(
 configuration, footer.getFileMetaData(), file, blocks, 
requestedSchema.getColumns());
-for (BlockMetaData block : blocks) {
+// use the blocks from the reader in case some do not match filters 
and will not be read
--- End diff --

Yes, we will need to backport this to the 2.3.x line. No rush to make it 
for 2.3.1 though, since dictionary filtering is off by default and this isn't a 
correctness problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21266
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91049/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase w...

2018-05-23 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/21295
  
Thanks for looking at this, everyone. Sorry for the delay in updating it, 
I'm currently out on paternity leave and don't have a lot of time. I'll get an 
update pushed sometime soon though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21266
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21372#discussion_r190382953
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 ---
@@ -169,6 +170,14 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
   }
 }
   }
+
+  test("SPARK-24322 Fix incorrect workaround for bug in 
java.sql.Timestamp") {
+withTempPath { path =>
+  val ts = Timestamp.valueOf("1900-05-05 12:34:56.000789")
+  Seq(ts).toDF.write.orc(path.getCanonicalPath)
+  checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts))
+}
+  }
--- End diff --

Oh, I missed this comments. Hive ORC and ORC MR reader doesn't have this 
bug because it uses `java.sql.Timestamp` class to unserialize it. This happens 
when we directly access the ORC column's sub-vectors, `times` and `nanos`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...

2018-05-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21411
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21416
  
**[Test build #91068 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91068/testReport)**
 for PR 21416 at commit 
[`ec91220`](https://github.com/apache/spark/commit/ec91220d5e8658500be6d049f8ab3496fc8a914e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190401710
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  case ReceiverEpochMarker(writerId) =>
+// Don't read any more from this queue. If all the writers 
have sent epoch markers,
+// the epoch is over; otherwise we need to poll from the 
remaining writers.
+writerEpochMarkersReceived(writerId) = true
+if (writerEpochMarkersReceived.forall(flag => flag)) {
--- End diff --

super nit: `writerEpochMarkersReceived.forall(_ == true)` is easier to 
understand.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r190400420
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
   spark.read.option("mode", "PERMISSIVE").option("encoding", 
"UTF-8").json(Seq(badJson).toDS()),
   Row(badJson))
   }
+
+  test("SPARK-23772 ignore column of all null values or empty array during 
schema inference") {
+ withTempPath { tempDir =>
+  val path = tempDir.getAbsolutePath
+  Seq(
+"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], 
"e":{}}""",
+"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""",
+"""{"a":null, "b":[], "c":[], "d": null, "e":null}""")
+.toDS().write.mode("overwrite").text(path)
+  val df = spark.read.format("json")
+.option("dropFieldIfAllNull", true)
+.load(path)
+  val expectedSchema = new StructType()
+.add("a", NullType).add("b", NullType).add("c", NullType).add("d", 
NullType)
+.add("e", NullType)
+  assert(df.schema === expectedSchema)
--- End diff --

It seems the `DefaultEquality` is used here which applies `==`. Are there 
any reasons for `===` instead of just `==`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...

2018-05-23 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/20929#discussion_r190397868
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---
@@ -379,6 +379,8 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
* that should be used for parsing.
* `samplingRatio` (default is 1.0): defines fraction of input JSON 
objects used
* for schema inferring.
+   * `dropFieldIfAllNull` (default `false`): whether to ignore column 
of all null values or
--- End diff --

How about 
[DataStreamReader](https://github.com/maropu/spark/blob/SPARK-23772/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L277)?
 I guess the description should be added to it too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...

2018-05-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21416#discussion_r190407851
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends 
Rule[LogicalPlan] {
 object OptimizeIn extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsDown {
-  case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral
+  case In(v, list) if list.isEmpty =>
+// When v is not nullable, the following expression will be 
optimized
+// to FalseLiteral which is tested in OptimizeInSuite.scala
+If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType))
+  case In(v, list) if list.length == 1 => EqualTo(v, list.head)
--- End diff --

Ur, @dbtsai . This will cause side-effects on typecasting. For example, 
please see the following example. Could you add these kind of test cases?
```scala
scala> sql("select '1.1' in (1), '1.1' = 1").collect()
res0: Array[org.apache.spark.sql.Row] = Array([false,true])
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...

2018-05-23 Thread eyalfa
Github user eyalfa commented on a diff in the pull request:

https://github.com/apache/spark/pull/21369#discussion_r190372506
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
@@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
 sc.stop()
   }
 
+  test("spill during iteration") {
+val size = 1000
+val conf = createSparkConf(loadDefaults = true)
+sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+val map = createExternalMap[Int]
+
+map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+assert(map.numSpills == 0, "map was not supposed to spill")
+
+val it = map.iterator
+assert( it.isInstanceOf[CompletionIterator[_, _]])
--- End diff --

@cloud-fan , the assumption here is that there are two references to the 
underlying map: the upstream iterator and the external map itself.
the destroy method first removes the ref via upstream-iterator than 
delegates to the method that clears the ref via the external map member 
(`currentMap` I think), so unless we've missed another ref we should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190398051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived = 
Array.fill(numShuffleWriters)(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
--- End diff --

Maybe also print which writers (and how many) are we waiting on, so that we 
can debug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...

2018-05-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21414
  
**[Test build #91057 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91057/testReport)**
 for PR 21414 at commit 
[`6fdd435`](https://github.com/apache/spark/commit/6fdd43585abcefcfb911039910ce7dd2a55df4e5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >