[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190370842 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) +val underlyingIt = map.readingIterator +assert( underlyingIt != null ) +val underlyingMapIterator = underlyingIt.upstream +assert(underlyingMapIterator != null) +val underlyingMapIteratorClass = underlyingMapIterator.getClass +assert(underlyingMapIteratorClass.getEnclosingClass == classOf[AppendOnlyMap[_, _]]) + +val underlyingMap = map.currentMap +assert(underlyingMap != null) + +val first50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert( map.numSpills == 0 ) +map.spill(Long.MaxValue, null) +// these asserts try to show that we're no longer holding references to the underlying map. +// it'd be nice to use something like +// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala +// (lines 69-89) +assert(map.currentMap == null) +assert(underlyingIt.upstream ne underlyingMapIterator) +assert(underlyingIt.upstream.getClass != underlyingMapIteratorClass) +assert(underlyingIt.upstream.getClass.getEnclosingClass != classOf[AppendOnlyMap[_, _]]) + +val next50Keys = for ( _ <- 0 until 50) yield { + val (k, vs) = it.next + val sortedVs = vs.sorted + assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + k +} +assert(!it.hasNext) +val keys = (first50Keys ++ next50Keys).sorted +assert(keys == (0 until 100)) + } + + test("drop all references to the underlying map once the iterator is exhausted") { --- End diff -- :+1: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190371425 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +591,25 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream true } } +private def destroy() : Unit = { + freeCurrentMap() + upstream = Iterator.empty +} + +private[ExternalAppendOnlyMap] --- End diff -- hmm... the class itself is private (slightly relaxed to package private to ease testing) so I'm not sure what's the benefit in making the method public, in any case I think that once we see the use case for making this method public we'd probably has to further change the iterator/external map classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21415 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21390 **[Test build #91064 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91064/testReport)** for PR 21390 at commit [`0df8e4e`](https://github.com/apache/spark/commit/0df8e4ec71971468854b6d778a1899df8df71211). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21369 **[Test build #91065 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91065/testReport)** for PR 21369 at commit [`e3c61fd`](https://github.com/apache/spark/commit/e3c61fd596cdb6eb6b54a6eee0004ca79cb553af). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21311 **[Test build #91052 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91052/testReport)** for PR 21311 at commit [`b8b6324`](https://github.com/apache/spark/commit/b8b632450d824d7abf092c10f8e94f6938be1104). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21411 cc @michal-databricks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91068/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190403327 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) --- End diff -- Why did this change from WAITING to TIMED_WAITING. Can the thread be one or the other state? would this cause flakiness? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190405223 --- Diff: core/src/main/scala/org/apache/spark/scheduler/PeakExecutorMetrics.scala --- @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.status.api.v1.PeakMemoryMetrics + +/** + * Records the peak values for executor level metrics. If jvmUsedHeapMemory is -1, then no + * values have been recorded yet. + */ +private[spark] class PeakExecutorMetrics { + private var _jvmUsedHeapMemory = -1L; + private var _jvmUsedNonHeapMemory = 0L; + private var _onHeapExecutionMemory = 0L + private var _offHeapExecutionMemory = 0L + private var _onHeapStorageMemory = 0L + private var _offHeapStorageMemory = 0L + private var _onHeapUnifiedMemory = 0L + private var _offHeapUnifiedMemory = 0L + private var _directMemory = 0L + private var _mappedMemory = 0L + + def jvmUsedHeapMemory: Long = _jvmUsedHeapMemory + + def jvmUsedNonHeapMemory: Long = _jvmUsedNonHeapMemory + + def onHeapExecutionMemory: Long = _onHeapExecutionMemory + + def offHeapExecutionMemory: Long = _offHeapExecutionMemory + + def onHeapStorageMemory: Long = _onHeapStorageMemory + + def offHeapStorageMemory: Long = _offHeapStorageMemory + + def onHeapUnifiedMemory: Long = _onHeapUnifiedMemory + + def offHeapUnifiedMemory: Long = _offHeapUnifiedMemory + + def directMemory: Long = _directMemory + + def mappedMemory: Long = _mappedMemory + + /** + * Compare the specified memory values with the saved peak executor memory + * values, and update if there is a new peak value. + * + * @param executorMetrics the executor metrics to compare + * @return if there is a new peak value for any metric + */ + def compareAndUpdate(executorMetrics: ExecutorMetrics): Boolean = { +var updated: Boolean = false + +if (executorMetrics.jvmUsedHeapMemory > _jvmUsedHeapMemory) { + _jvmUsedHeapMemory = executorMetrics.jvmUsedHeapMemory + updated = true +} +if (executorMetrics.jvmUsedNonHeapMemory > _jvmUsedNonHeapMemory) { + _jvmUsedNonHeapMemory = executorMetrics.jvmUsedNonHeapMemory + updated = true +} +if (executorMetrics.onHeapExecutionMemory > _onHeapExecutionMemory) { + _onHeapExecutionMemory = executorMetrics.onHeapExecutionMemory + updated = true +} +if (executorMetrics.offHeapExecutionMemory > _offHeapExecutionMemory) { + _offHeapExecutionMemory = executorMetrics.offHeapExecutionMemory + updated = true +} +if (executorMetrics.onHeapStorageMemory > _onHeapStorageMemory) { + _onHeapStorageMemory = executorMetrics.onHeapStorageMemory + updated = true +} +if (executorMetrics.offHeapStorageMemory > _offHeapStorageMemory) { + _offHeapStorageMemory = executorMetrics.offHeapStorageMemory --- End diff -- I know spark has this kind of code all over the place already, but I really hate how error prone it is -- way too easy for a copy paste error to result in comparing the wrong two metrics, or updating the wrong value, or forgetting to update this when another metric is added, etc. I just opened this https://github.com/edwinalu/spark/pull/1 as another way to do this that would eliminate a ton of boilerplate IMO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r190399084 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), Row(badJson)) } + + test("SPARK-23772 ignore column of all null values or empty array during schema inference") { + withTempPath { tempDir => + val path = tempDir.getAbsolutePath + Seq( +"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], "e":{}}""", +"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""", +"""{"a":null, "b":[], "c":[], "d": null, "e":null}""") +.toDS().write.mode("overwrite").text(path) --- End diff -- Do you need the `overwrite` mode? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r190401748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), Row(badJson)) } + + test("SPARK-23772 ignore column of all null values or empty array during schema inference") { + withTempPath { tempDir => + val path = tempDir.getAbsolutePath + Seq( +"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], "e":{}}""", +"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""", +"""{"a":null, "b":[], "c":[], "d": null, "e":null}""") --- End diff -- Could you add a test when `dropFieldIfAllNull` is set to `true` but not all values in a column are `null`s --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21372 Yep. Both JIRA and PR description is updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21372 Retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21372 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21415 **[Test build #91061 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91061/testReport)** for PR 21415 at commit [`0aef16b`](https://github.com/apache/spark/commit/0aef16b5e9017fb398e0df2f3694a1db1f4d7cb8). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21394: [SPARK-24329][SQL] Test for skipping multi-space lines
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21394 **[Test build #91063 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91063/testReport)** for PR 21394 at commit [`a2cdc29`](https://github.com/apache/spark/commit/a2cdc2916d70c7031f41b1b9795af0e4b50cc977). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21410: [SPARK-24366][SQL] Improving of error messages for type ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21410 **[Test build #91062 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91062/testReport)** for PR 21410 at commit [`26cc2f8`](https://github.com/apache/spark/commit/26cc2f84ee6324db23936e20816d240031211311). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21390 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21390 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21311 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91052/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21390: [SPARK-24340][Core] Clean up non-shuffle disk block mana...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21390 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3521/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21311 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21369 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3522/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/21404 I'm investigating timing issue here. Spark loads Hive Metastore class lazily. Here, Spark is trying to access Hive metastore tables like `DBS` before it's created. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21311 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21372 Please document the description of the bug in both JIRA and PR description? Also need to mention which ORC reader is affected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21416 **[Test build #91068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91068/testReport)** for PR 21416 at commit [`ec91220`](https://github.com/apache/spark/commit/ec91220d5e8658500be6d049f8ab3496fc8a914e). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190399106 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { } } + test("isinSet: Scala Set") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") +checkAnswer(df.filter($"a".isinSet(Set(1, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 1))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +// Auto casting should work with mixture of different types in Set +checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, "1"))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +checkAnswer(df.filter($"b".isinSet(Set("y", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "y" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "y"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "y")) + +val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") + +intercept[AnalysisException] { + df2.filter($"a".isinSet(Set($"b"))) +} --- End diff -- Let's check the error message to prevent the future regression like raising different AnalysisException. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190401100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null --- End diff -- super nit: I personally find these sort of escape hard to read in code. Consider making a separate flag to end the while loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190400926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { --- End diff -- move this inside... its hard to follow the pattern `x = // some large code structured` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190403472 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) +val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { -// set the non-inheritable thread local -TaskContext.setTaskContext(ctx) -val epoch = rdd.compute(rdd.partitions(0), ctx) -epoch.next().getInt(0) +try { + epoch.next().getInt(0) +} catch { + case _: InterruptedException => // do nothing - expected at test ending +} } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { -assert(readRowThread.getState == Thread.State.WAITING) +assert(readRowThread.getState == Thread.State.TIMED_WAITING) } } finally { readRowThread.interrupt() readRowThread.join() } } + + test("multiple writers") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1, numShuffleWriters = 3) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +send( + endpoint, + ReceiverRow(0, unsafeRow("writer0-row0")), + ReceiverRow(1, unsafeRow("writer1-row0")), + ReceiverRow(2, unsafeRow("writer2-row0")), + ReceiverEpochMarker(0), + ReceiverEpochMarker(1), + ReceiverEpochMarker(2) +) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.toSeq.map(_.getUTF8String(0).toString).toSet == + Set("writer0-row0", "writer1-row0", "writer2-row0")) + } + + test("epoch only ends when all writers send markers") { +val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, numShuffleWriters = 3, checkpointIntervalMs = Long.MaxValue) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +send( + endpoint, + ReceiverRow(0, unsafeRow("writer0-row0")), + ReceiverRow(1, unsafeRow("writer1-row0")), + ReceiverRow(2, unsafeRow("writer2-row0")), + ReceiverEpochMarker(0), + ReceiverEpochMarker(2) +) + +val epoch = rdd.compute(rdd.partitions(0), ctx) +val rows = (0 until 3).map(_ => epoch.next()).toSet +assert(rows.map(_.getUTF8String(0).toString) == + Set("writer0-row0", "writer1-row0", "writer2-row0")) + +// After checking the right rows, block until we get an epoch marker indicating there's no next. +// (Also fail the assertion if for some reason we get a row.) +val readEpochMarkerThread = new Thread { + override def run(): Unit = { +assert(!epoch.hasNext) + } +} + +readEpochMarkerThread.start() +eventually(timeout(streamingTimeout)) { + assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) --- End diff -- Same question as above ... is this the only possible thread state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21372 **[Test build #91055 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91055/testReport)** for PR 21372 at commit [`954d1d9`](https://github.com/apache/spark/commit/954d1d92ade183d8774b75e03cb02e16635cde48). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21372 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21372 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91055/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21372 **[Test build #91070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91070/testReport)** for PR 21372 at commit [`954d1d9`](https://github.com/apache/spark/commit/954d1d92ade183d8774b75e03cb02e16635cde48). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21410: [SPARK-24366][SQL] Improving of error messages for type ...
Github user MaxGekk commented on the issue: https://github.com/apache/spark/pull/21410 jenkins, retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190375595 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala --- @@ -585,17 +592,15 @@ class ExternalAppendOnlyMap[K, V, C]( } else { logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " + s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory") -nextUpstream = spillMemoryIteratorToDisk(upstream) +val nextUpstream = spillMemoryIteratorToDisk(upstream) +assert(!upstream.hasNext) hasSpilled = true +upstream = nextUpstream --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (currentMap I think), so unless we've missed another ref we should be fine. as I wrote above, I think there's a potentially more fundamental issue with `CompletionIterator` which keeps holding references via it's `sub` and `completionFunction` members , these might stall some objects from being collected and can be eliminated upon exhaustion of the iterator. there might be some more 'candidates' like `LazyIterator` and `InterruptibleIterator`, I think this desrves some more investigation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190377736 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-24230: filter row group using dictionary") { +withSQLConf(("parquet.filter.dictionary.enabled", "true")) { --- End diff -- Looks like the problem is a bug in Parquet. It is using the [stats property instead of the dictionary property](https://github.com/apache/parquet-mr/blob/8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java#L83). This is minor because there is almost no reason to turn either one off now that we've built more confidence in the filters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase w...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21295 Thanks for your investigation! Also, congratulations! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21266: [SPARK-24206][SQL] Improve DataSource read benchm...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21266 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21379: [SPARK-24327][SQL] Add an option to quote a parti...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21379#discussion_r190382338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -78,7 +79,12 @@ private[sql] object JDBCRelation extends Logging { // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions -val column = partitioning.column +val column = if (jdbcOptions.quotePartitionColumnName) { + val dialect = JdbcDialects.get(jdbcOptions.url) + dialect.quoteIdentifier(partitioning.column) --- End diff -- Yeah --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21342 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21372#discussion_r190383386 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -169,6 +170,14 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + test("SPARK-24322 Fix incorrect workaround for bug in java.sql.Timestamp") { +withTempPath { path => + val ts = Timestamp.valueOf("1900-05-05 12:34:56.000789") + Seq(ts).toDF.write.orc(path.getCanonicalPath) + checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) +} + } --- End diff -- `OrcSourceSuite` is dedicated for `native` Orc Reader . For `hive` ORC reader, `HiveOrcSourceSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21342 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21404 @wangyum I do not think we should deprecate the support of the previous versions of Hive metastore. Many Spark users are still using them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21399: [SPARK-22269][BUILD] Run Java linter via SBT for Jenkins
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/21399 Checked locally and looks good. Feel free to merge if you don't want to address the comment above in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21404: [SPARK-24360][SQL] Support Hive 3.0 metastore
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21404 @dongjoon-hyun Thanks for your investigation! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190389782 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3524/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190402783 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null --- End diff -- Actually ... finished is the flag, just use that for the while loop, that is, `while (!finished && nextRow == null)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21346 **[Test build #91056 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91056/testReport)** for PR 21346 at commit [`3098b9c`](https://github.com/apache/spark/commit/3098b9cd9ffc29517b446bb660fe5be9f0031cc1). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21319: [SPARK-24267][SQL] explicitly keep DataSourceReader in D...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21319 @cloud-fan, what about adding support for v2 pushdown in the stats visitor instead? Here's the idea: when the visitor hits a `Filter` or a `Project`, it tries to match the plan using `PhysicalOperation`. If that works and the underlying relation is a `DataSourceV2Relation`, it does the pushdown to configure a reader and return stats from it. That would give us the ability to do pushdown on conversion to physical plan, but we'd get correct stats. The only drawback is that we would temporarily make the stats code a bit larger, but at least it is separate so we can get the logical plans right and fix stats on a separate schedule. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21415: [SPARK-24244][SPARK-24368][SQL] Passing only requ...
GitHub user MaxGekk opened a pull request: https://github.com/apache/spark/pull/21415 [SPARK-24244][SPARK-24368][SQL] Passing only required columns to the CSV parser ## What changes were proposed in this pull request? uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like: ``` // Here we select only the columns by their indexes. // The parser just skips the values in other columns parserSettings.selectIndexes(4, 0, 1); CsvParser parser = new CsvParser(parserSettings); ``` In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns: ``` Select 100 columns out of 1000: x1.76 Select 1 column out of 1000: x2 ``` **Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. ## How was this patch tested? It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MaxGekk/spark-1 csv-column-pruning2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21415 commit 9cffa0fccc33552e8fce3580a9a665b022f5bf22 Author: Maxim GekkDate: 2018-03-21T20:03:11Z Adding tests for select only requested columns commit fdbcbe3536aee04e6a84b72ac319726614416bc3 Author: Maxim Gekk Date: 2018-03-21T20:42:08Z Select indexes of required columns only commit 578f47b0f32a76caf6c9ede8763c9cf85a1c83e9 Author: Maxim Gekk Date: 2018-03-24T10:41:29Z Fix the case when number of parsed fields are not matched to required schema commit 0f942c308dca173dad8f421e893066b8c03d35a3 Author: Maxim Gekk Date: 2018-03-24T11:07:55Z Using selectIndexes if required number of columns are less than its total number. commit c4b11601e9c264729e141fff3dc653d868a7ad69 Author: Maxim Gekk Date: 2018-03-24T11:48:43Z Fix the test: force to read all columns commit 8cf6eab952d79628cb8ee2ff7b92dadae60ec686 Author: Maxim Gekk Date: 2018-04-06T20:55:35Z Fix merging conflicts commit 5b2f0b9d7346f927842bc1a2089a7299876f1894 Author: Maxim Gekk Date: 2018-04-29T11:52:08Z Benchmarks for many columns commit 6d1e902c0011e88dbafb65c4ad6e7431370ed12d Author: Maxim Gekk Date: 2018-04-29T12:59:58Z Make size of requiredSchema equals to amount of selected columns commit 4525795f7337cbd081f569cd79d7f90cb58edbee Author: Maxim Gekk Date: 2018-04-29T13:36:54Z Removing selection of all columns commit 8809cecf93d8e7a97eca827d9e8637a7eb5b2449 Author: Maxim Gekk Date: 2018-04-29T13:50:44Z Updating benchmarks for select indexes commit dc97ceb96185ed2eaa05fbe1aee8ecfe8ccb7e7d Author: Maxim Gekk Date: 2018-05-05T19:19:17Z Addressing Herman's review comments commit 51b31483263e13cd85b19b3efea65188945eda99 Author: Maxim Gekk Date: 2018-05-10T18:39:38Z Updated benchmark result for recent changes commit e3958b1468b490b548574b53512f0d83850e6f6f Author: Maxim Gekk Date: 2018-05-10T18:46:17Z Add ticket number to test title commit a4a0a549156a15011c33c7877a35f244d75b7a4f Author: Maxim Gekk Date: 2018-05-10T19:02:24Z Removing unnecessary benchmark commit fa860157c982846524bd8f151daf8a2154117b34 Author: Maxim Gekk Date: 2018-05-13T18:49:49Z Updating the migration guide commit 15528d20a74904c14c58bf3ad54c9a552c519430 Author: Maxim Gekk Date: 2018-05-13T18:55:06Z Moving some values back as it was. commit f90daa7ea33d119be978c27de10978c2d6281e25 Author: Maxim Gekk Date: 2018-05-13T18:58:20Z Renaming the test title commit 4d9873d39277b9cbaee892957c06bfc2cb9a52f1 Author: Maxim Gekk Date: 2018-05-17T20:02:47Z Improving of the migration guide commit 7dcfc7a7664fcd5311cb352f0ea7a24b3cc1c639 Author: Maxim Gekk Date: 2018-05-17T20:12:49Z Merge remote-tracking branch 'origin/master' into csv-column-pruning # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVBenchmarks.scala #
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user JoshRosen commented on the issue: https://github.com/apache/spark/pull/21342 Updated changes LGTM. Thanks for working on this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21266 **[Test build #91049 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91049/testReport)** for PR 21266 at commit [`5eab1a5`](https://github.com/apache/spark/commit/5eab1a51badca648c52287f7eb88dcdb481c5c3b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190378887 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -225,7 +226,8 @@ protected void initialize(String path, List columns) throws IOException this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema); this.reader = new ParquetFileReader( config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read +for (BlockMetaData block : reader.getRowGroups()) { --- End diff -- Dictionary filtering is off by default in 1.8.x. It was enabled after we built confidence in its correctness in 1.9.x. We should backport this fix to 2.3.x also, but the only downside to not having it is that dictionary filtering will throw an exception when it is enabled. So the feature just isn't available. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21288#discussion_r190382044 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala --- @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions.monotonically_increasing_id +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure read performance with Filter pushdown. + * To run this: + * spark-submit --class + */ +object FilterPushdownBenchmark { + val conf = new SparkConf() +.setAppName("FilterPushdownBenchmark") +.setIfMissing("spark.master", "local[1]") +.setIfMissing("spark.driver.memory", "3g") +.setIfMissing("spark.executor.memory", "3g") +.setIfMissing("orc.compression", "snappy") +.setIfMissing("spark.sql.parquet.compression.codec", "snappy") + + private val spark = SparkSession.builder().config(conf).getOrCreate() + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private def prepareTable( + dir: File, numRows: Int, width: Int, useStringForValue: Boolean): Unit = { +import spark.implicits._ +val selectExpr = (1 to width).map(i => s"CAST(value AS STRING) c$i") +val valueCol = if (useStringForValue) { + monotonically_increasing_id().cast("string") +} else { + monotonically_increasing_id() +} +val df = spark.range(numRows).map(_ => Random.nextLong).selectExpr(selectExpr: _*) + .withColumn("value", valueCol) + .sort("value") + +saveAsOrcTable(df, dir.getCanonicalPath + "/orc") +saveAsParquetTable(df, dir.getCanonicalPath + "/parquet") + } + + private def prepareStringDictTable( + dir: File, numRows: Int, numDistinctValues: Int, width: Int): Unit = { +val selectExpr = (0 to width).map { + case 0 => s"CAST(id % $numDistinctValues AS STRING) AS value" + case i => s"CAST(rand() AS STRING) c$i" +} +val df = spark.range(numRows).selectExpr(selectExpr: _*).sort("value") + +saveAsOrcTable(df, dir.getCanonicalPath + "/orc") +saveAsParquetTable(df, dir.getCanonicalPath + "/parquet") + } + + private def saveAsOrcTable(df: DataFrame, dir: String): Unit = { +df.write.mode("overwrite").orc(dir) +spark.read.orc(dir).createOrReplaceTempView("orcTable") + } + + private def saveAsParquetTable(df: DataFrame, dir: String): Unit = { +df.write.mode("overwrite").parquet(dir) +spark.read.parquet(dir).createOrReplaceTempView("parquetTable") + } + + def filterPushDownBenchmark( + values: Int, + title: String, + whereExpr: String, + selectExpr: String = "*"): Unit = { +val benchmark = new Benchmark(title, values, minNumIters = 5) + +Seq(false, true).foreach { pushDownEnabled => + val
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21372 Before we do the merge, could you address the comment: https://github.com/apache/spark/pull/21372#discussion_r190073105? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21369 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21399: [SPARK-22269][BUILD] Run Java linter via SBT for ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21399#discussion_r190383825 --- Diff: dev/run-tests.py --- @@ -574,8 +574,7 @@ def main(): or f.endswith("checkstyle.xml") or f.endswith("checkstyle-suppressions.xml") for f in changed_files): -# run_java_style_checks() -pass +run_java_style_checks() --- End diff -- This is really minor, but now you'll be running this when building with maven too. That could be avoided by checking that the build is not using maven; and also changing the maven target from `package` to `verify` so that these checks run. You can add `[build-maven]` to the PR title to try it out (or locally with `AMPLAB_JENKINS_BUILD_TOOL=maven`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21311 **[Test build #91066 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91066/testReport)** for PR 21311 at commit [`b8b6324`](https://github.com/apache/spark/commit/b8b632450d824d7abf092c10f8e94f6938be1104). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190386369 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21411 **[Test build #91067 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91067/testReport)** for PR 21411 at commit [`3a6a87b`](https://github.com/apache/spark/commit/3a6a87ba0e0bcb36a7a023edbd35fe411ed2fd6d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21411 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3523/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21411 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190396813 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -42,16 +47,24 @@ case class ContinuousShuffleReadPartition(index: Int, queueSize: Int) extends Pa * RDD at the map side of each continuous processing shuffle task. Upstream tasks send their * shuffle output to the wrapped receivers in partitions of this RDD; each of the RDD's tasks * poll from their receiver until an epoch marker is sent. + * + * @param sc the RDD context + * @param numPartitions the number of read partitions for this RDD + * @param queueSize the size of the row buffers to use + * @param numShuffleWriters the number of continuous shuffle writers feeding into this RDD + * @param checkpointIntervalMs the checkpoint interval of the streaming query */ class ContinuousShuffleReadRDD( sc: SparkContext, numPartitions: Int, -queueSize: Int = 1024) +queueSize: Int = 1024, +numShuffleWriters: Int = 1, +checkpointIntervalMs: Long = 1000) --- End diff -- Isnt this the same as epochInterval? the term "epoch" is more well known in the code than "checkpoint" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21416 **[Test build #91069 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91069/testReport)** for PR 21416 at commit [`6459015`](https://github.com/apache/spark/commit/6459015fd923371b1515177db477afe1ce2813aa). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21385 **[Test build #91054 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91054/testReport)** for PR 21385 at commit [`e02d714`](https://github.com/apache/spark/commit/e02d714f6c6774eb275bf86cc81e09dd80f40b11). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21414 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21414 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91057/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21346 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91056/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21346: [SPARK-6237][NETWORK] Network-layer changes to allow str...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21346 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21266 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190385827 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- We always add to the top level and then in the lower level poms, we reference the dependent modules without listing their versions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21416 [SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java. ## What changes were proposed in this pull request? Implemented **`isinSet`** in DataFrame API for both Scala and Java, so users can do ```scala val profileDF = Seq( Some(1), Some(2), Some(3), Some(4), Some(5), Some(6), Some(7), None ).toDF("profileID") val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3") val result = profileDF.withColumn("isValid", $"profileID".isinSet(validUsers)) result.show(10) """ +-+---+ |profileID|isValid| +-+---+ |1| false| |2| false| |3| true| |4| false| |5| false| |6| true| |7| true| | null| null| +-+---+ """.stripMargin ``` Two new rules in the logical plan optimizers are added. 1. When there is only one element in the **`Set`**, the physical plan will be optimized to **`EqualTo`**, so predicate pushdown can be used. ```scala profileDF.filter( $"profileID".isinSet(Set(6))).explain(true) """ |== Physical Plan == |*(1) Project [profileID#0] |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6)) | +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet, | PartitionFilters: [], | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)], | ReadSchema: struct """.stripMargin ``` 2. When the **`Set`** is empty, and the input is nullable, the logical plan will be simplified to ```scala profileDF.filter( $"profileID".isinSet(Set())).explain(true) """ |== Optimized Logical Plan == |Filter if (isnull(profileID#0)) null else false |+- Relation[profileID#0] parquet """.stripMargin ``` TODO: 1. For multiple conditions with numbers less than certain thresholds, we should still allow predicate pushdown. 2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when the numbers of the categories are low, and they are **`Int`**, **`Long`**. 3. The default immutable hash trees set is slow for query, and we should do benchmark for using different set implementation for faster query. 4. **`filter(if (condition) null else false)`** can be optimized to false. ## How was this patch tested? Several unit tests are added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark optimize-set Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21416 commit ec91220d5e8658500be6d049f8ab3496fc8a914e Author: DB TsaiDate: 2018-05-17T00:21:14Z Added isinSet in DataFrame API for Scala and Java. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190398282 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21416 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3525/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21385 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91054/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21385: [SPARK-24234][SS] Support multiple row writers in contin...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21385 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190402584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { + finished = true + // Break out of the while loop and end the iterator. + return null +} else { + // Poll again for the next completion result. + null +} --- End diff -- if you put `nextRow = newReceivedRow` inside the case ReceiverRow, then this else clause is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r190372105 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, LinkedBlockingQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.ExecutorExited +import org.apache.spark.util.Utils + +private[spark] class ExecutorPodsEventHandler( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +eventProcessorExecutor: ScheduledExecutorService) extends Logging { + + import ExecutorPodsEventHandler._ + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]() + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Use sets of ids instead of counters to be able to handle duplicate events. + + // Executor IDs that have been requested from Kubernetes but are not running yet. + private val pendingExecutors = mutable.Set.empty[Long] + + // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors here for tallying the + // executors that are running. But, here we choose instead to maintain all state within this + // class from the persecptive of the k8s API. Therefore whether or not this scheduler loop + // believes an executor is running is dictated by the K8s API rather than Spark's RPC events. + // We may need to consider where these perspectives may differ and which perspective should + // take precedence. + private val runningExecutors = mutable.Set.empty[Long] + + private var eventProcessorFuture: Future[_] = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { +require(eventProcessorFuture == null, "Cannot start event processing twice.") +logInfo(s"Starting Kubernetes executor pods event handler for application with" + + s" id $applicationId.") +val eventProcessor = new Runnable { + override def run(): Unit = { +Utils.tryLogNonFatalError { + processEvents(applicationId, schedulerBackend) +} + } +} +eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay( + eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (eventProcessorFuture != null) { + eventProcessorFuture.cancel(true) + eventProcessorFuture = null +} + } + + private def processEvents( + applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend) { +val currentEvents = new java.util.ArrayList[Seq[Pod]](eventQueue.size()) +
[GitHub] spark issue #21415: [SPARK-24244][SPARK-24368][SQL] Passing only required co...
Github user MaxGekk commented on the issue: https://github.com/apache/spark/pull/21415 The difference between this PR and #21296 is that the `columnPruning` is passed to CSVOptions as a parameter. It should fix flaky `UnivocityParserSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReade...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21295#discussion_r190379077 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java --- @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); this.reader = new ParquetFileReader( configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); -for (BlockMetaData block : blocks) { +// use the blocks from the reader in case some do not match filters and will not be read --- End diff -- Yes, we will need to backport this to the 2.3.x line. No rush to make it for 2.3.1 though, since dictionary filtering is off by default and this isn't a correctness problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91049/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21295: [SPARK-24230][SQL] Fix SpecificParquetRecordReaderBase w...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21295 Thanks for looking at this, everyone. Sorry for the delay in updating it, I'm currently out on paternity leave and don't have a lot of time. I'll get an update pushed sometime soon though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21266 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21372#discussion_r190382953 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala --- @@ -169,6 +170,14 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll { } } } + + test("SPARK-24322 Fix incorrect workaround for bug in java.sql.Timestamp") { +withTempPath { path => + val ts = Timestamp.valueOf("1900-05-05 12:34:56.000789") + Seq(ts).toDF.write.orc(path.getCanonicalPath) + checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts)) +} + } --- End diff -- Oh, I missed this comments. Hive ORC and ORC MR reader doesn't have this bug because it uses `java.sql.Timestamp` class to unserialize it. This happens when we directly access the ORC column's sub-vectors, `times` and `nanos`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21411: [SPARK-24367][SQL]Parquet: use JOB_SUMMARY_LEVEL instead...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21411 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame API for S...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21416 **[Test build #91068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91068/testReport)** for PR 21416 at commit [`ec91220`](https://github.com/apache/spark/commit/ec91220d5e8658500be6d049f8ab3496fc8a914e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190401710 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + case ReceiverEpochMarker(writerId) => +// Don't read any more from this queue. If all the writers have sent epoch markers, +// the epoch is over; otherwise we need to poll from the remaining writers. +writerEpochMarkersReceived(writerId) = true +if (writerEpochMarkersReceived.forall(flag => flag)) { --- End diff -- super nit: `writerEpochMarkersReceived.forall(_ == true)` is easier to understand. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r190400420 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2408,4 +2408,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { spark.read.option("mode", "PERMISSIVE").option("encoding", "UTF-8").json(Seq(badJson).toDS()), Row(badJson)) } + + test("SPARK-23772 ignore column of all null values or empty array during schema inference") { + withTempPath { tempDir => + val path = tempDir.getAbsolutePath + Seq( +"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], "e":{}}""", +"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""", +"""{"a":null, "b":[], "c":[], "d": null, "e":null}""") +.toDS().write.mode("overwrite").text(path) + val df = spark.read.format("json") +.option("dropFieldIfAllNull", true) +.load(path) + val expectedSchema = new StructType() +.add("a", NullType).add("b", NullType).add("c", NullType).add("d", NullType) +.add("e", NullType) + assert(df.schema === expectedSchema) --- End diff -- It seems the `DefaultEquality` is used here which applies `==`. Are there any reasons for `===` instead of just `==`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20929: [SPARK-23772][SQL] Provide an option to ignore co...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20929#discussion_r190397868 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -379,6 +379,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * that should be used for parsing. * `samplingRatio` (default is 1.0): defines fraction of input JSON objects used * for schema inferring. + * `dropFieldIfAllNull` (default `false`): whether to ignore column of all null values or --- End diff -- How about [DataStreamReader](https://github.com/maropu/spark/blob/SPARK-23772/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala#L277)? I guess the description should be added to it too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190407851 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- Ur, @dbtsai . This will cause side-effects on typecasting. For example, please see the following example. Could you add these kind of test cases? ```scala scala> sql("select '1.1' in (1), '1.1' = 1").collect() res0: Array[org.apache.spark.sql.Row] = Array([false,true]) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r190372506 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,6 +415,99 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } + test("spill during iteration") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] + +map.insertAll((0 until size).iterator.map(i => (i / 10, i))) +assert(map.numSpills == 0, "map was not supposed to spill") + +val it = map.iterator +assert( it.isInstanceOf[CompletionIterator[_, _]]) --- End diff -- @cloud-fan , the assumption here is that there are two references to the underlying map: the upstream iterator and the external map itself. the destroy method first removes the ref via upstream-iterator than delegates to the method that clears the ref via the external map member (`currentMap` I think), so unless we've missed another ref we should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190398051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,71 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") --- End diff -- Maybe also print which writers (and how many) are we waiting on, so that we can debug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21414: [SPARK-24368][SQL] Removing columnPruning from CSVOption...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21414 **[Test build #91057 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91057/testReport)** for PR 21414 at commit [`6fdd435`](https://github.com/apache/spark/commit/6fdd43585abcefcfb911039910ce7dd2a55df4e5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org