svn commit: r26073 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_20_01-529f847-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Mar 31 03:15:30 2018 New Revision: 26073 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_20_01-529f847 docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23040][CORE][FOLLOW-UP] Avoid double wrap result Iterator.
Repository: spark Updated Branches: refs/heads/master 15298b99a -> 529f84710 [SPARK-23040][CORE][FOLLOW-UP] Avoid double wrap result Iterator. ## What changes were proposed in this pull request? Address https://github.com/apache/spark/pull/20449#discussion_r172414393, If `resultIter` is already a `InterruptibleIterator`, don't double wrap it. ## How was this patch tested? Existing tests. Author: Xingbo JiangCloses #20920 from jiangxb1987/SPARK-23040. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/529f8471 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/529f8471 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/529f8471 Branch: refs/heads/master Commit: 529f847105fa8d98a5dc4d20955e4870df6bc1c5 Parents: 15298b9 Author: Xingbo Jiang Authored: Sat Mar 31 10:34:01 2018 +0800 Committer: Wenchen Fan Committed: Sat Mar 31 10:34:01 2018 +0800 -- .../apache/spark/shuffle/BlockStoreShuffleReader.scala | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/529f8471/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 85e7e56..4103dfb 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -111,8 +111,13 @@ private[spark] class BlockStoreShuffleReader[K, C]( case None => aggregatedIter } -// Use another interruptible iterator here to support task cancellation as aggregator or(and) -// sorter may have consumed previous interruptible iterator. -new InterruptibleIterator[Product2[K, C]](context, resultIter) + +resultIter match { + case _: InterruptibleIterator[Product2[K, C]] => resultIter + case _ => +// Use another interruptible iterator here to support task cancellation as aggregator +// or(and) sorter may have consumed previous interruptible iterator. +new InterruptibleIterator[Product2[K, C]](context, resultIter) +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26072 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_30_18_02-507cff2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Mar 31 01:16:47 2018 New Revision: 26072 Log: Apache Spark 2.3.1-SNAPSHOT-2018_03_30_18_02-507cff2 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23827][SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions
Repository: spark Updated Branches: refs/heads/branch-2.3 3f5955aa0 -> 507cff246 [SPARK-23827][SS] StreamingJoinExec should ensure that input data is partitioned into specific number of partitions ## What changes were proposed in this pull request? Currently, the requiredChildDistribution does not specify the partitions. This can cause the weird corner cases where the child's distribution is `SinglePartition` which satisfies the required distribution of `ClusterDistribution(no-num-partition-requirement)`, thus eliminating the shuffle needed to repartition input data into the required number of partitions (i.e. same as state stores). That can lead to "file not found" errors on the state store delta files as the micro-batch-with-no-shuffle will not run certain tasks and therefore not generate the expected state store delta files. This PR adds the required constraint on the number of partitions. ## How was this patch tested? Modified test harness to always check that ANY stateful operator should have a constraint on the number of partitions. As part of that, the existing opt-in checks on child output partitioning were removed, as they are redundant. Author: Tathagata DasCloses #20941 from tdas/SPARK-23827. (cherry picked from commit 15298b99ac8944e781328423289586176cf824d7) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/507cff24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/507cff24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/507cff24 Branch: refs/heads/branch-2.3 Commit: 507cff246cd9e15a418d67b66bf762be4ae71c67 Parents: 3f5955a Author: Tathagata Das Authored: Fri Mar 30 16:48:26 2018 -0700 Committer: Tathagata Das Committed: Fri Mar 30 16:48:55 2018 -0700 -- .../streaming/IncrementalExecution.scala| 2 +- .../StreamingSymmetricHashJoinExec.scala| 3 +- .../spark/sql/streaming/DeduplicateSuite.scala | 8 +--- .../streaming/FlatMapGroupsWithStateSuite.scala | 5 +- .../sql/streaming/StatefulOperatorTest.scala| 49 .../apache/spark/sql/streaming/StreamTest.scala | 19 .../streaming/StreamingAggregationSuite.scala | 4 +- 7 files changed, 25 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index a10ed5f..1a83c88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -62,7 +62,7 @@ class IncrementalExecution( StreamingDeduplicationStrategy :: Nil } - private val numStateStores = offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key) + private[sql] val numStateStores = offsetSeqMetadata.conf.get(SQLConf.SHUFFLE_PARTITIONS.key) .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter) .getOrElse(sparkSession.sessionState.conf.numShufflePartitions) http://git-wip-us.apache.org/repos/asf/spark/blob/507cff24/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index c351f65..fa7c8ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -167,7 +167,8 @@ case class StreamingSymmetricHashJoinExec( val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) override def requiredChildDistribution: Seq[Distribution] = -ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil +ClusteredDistribution(leftKeys, stateInfo.map(_.numPartitions)) :: + ClusteredDistribution(rightKeys, stateInfo.map(_.numPartitions)) :: Nil override def output: Seq[Attribute] = joinType match { case _: InnerLike => left.output ++ right.output
svn commit: r26071 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_16_01-ae91720-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 23:15:05 2018 New Revision: 26071 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_16_01-ae91720 docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26069 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_30_14_01-3f5955a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 21:15:09 2018 New Revision: 26069 Log: Apache Spark 2.3.1-SNAPSHOT-2018_03_30_14_01-3f5955a docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23640][CORE] Fix hadoop config may override spark config
Repository: spark Updated Branches: refs/heads/master bc8d09311 -> ae9172017 [SPARK-23640][CORE] Fix hadoop config may override spark config ## What changes were proposed in this pull request? It may be get `spark.shuffle.service.port` from https://github.com/apache/spark/blob/9745ec3a61c99be59ef6a9d5eebd445e8af65b7a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L459 Therefore, the client configuration `spark.shuffle.service.port` does not working unless the configuration is `spark.hadoop.spark.shuffle.service.port`. - This configuration is not working: ``` bin/spark-sql --master yarn --conf spark.shuffle.service.port=7338 ``` - This configuration works: ``` bin/spark-sql --master yarn --conf spark.hadoop.spark.shuffle.service.port=7338 ``` This PR fix this issue. ## How was this patch tested? It's difficult to carry out unit testing. But I've tested it manually. Author: Yuming WangCloses #20785 from wangyum/SPARK-23640. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae917201 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae917201 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae917201 Branch: refs/heads/master Commit: ae9172017c361e5c1039bc2ca94048117021974a Parents: bc8d093 Author: Yuming Wang Authored: Fri Mar 30 14:09:14 2018 -0700 Committer: Marcelo Vanzin Committed: Fri Mar 30 14:09:14 2018 -0700 -- .../main/scala/org/apache/spark/util/Utils.scala | 18 +++--- 1 file changed, 11 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae917201/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5caedeb..d2be932 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2302,16 +2302,20 @@ private[spark] object Utils extends Logging { } /** - * Return the value of a config either through the SparkConf or the Hadoop configuration - * if this is Yarn mode. In the latter case, this defaults to the value set through SparkConf - * if the key is not set in the Hadoop configuration. + * Return the value of a config either through the SparkConf or the Hadoop configuration. + * We Check whether the key is set in the SparkConf before look at any Hadoop configuration. + * If the key is set in SparkConf, no matter whether it is running on YARN or not, + * gets the value from SparkConf. + * Only when the key is not set in SparkConf and running on YARN, + * gets the value from Hadoop configuration. */ def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = { -val sparkValue = conf.get(key, default) -if (conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") { - new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(conf)).get(key, sparkValue) +if (conf.contains(key)) { + conf.get(key, default) +} else if (conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") { + new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(conf)).get(key, default) } else { - sparkValue + default } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26068 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_12_01-bc8d093-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 19:15:21 2018 New Revision: 26068 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_12_01-bc8d093 docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert "[SPARK-23785][LAUNCHER] LauncherBackend doesn't check state of connection before setting state"
Repository: spark Updated Branches: refs/heads/branch-2.3 1365d739d -> 3f5955aa0 Revert "[SPARK-23785][LAUNCHER] LauncherBackend doesn't check state of connection before setting state" This reverts commit 0bfbcaf6696570b74923047266b00ba4dc2ba97c. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f5955aa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f5955aa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f5955aa Branch: refs/heads/branch-2.3 Commit: 3f5955aa0a16707a31fa9089d4babd64b5eed6ff Parents: 1365d73 Author: Marcelo VanzinAuthored: Fri Mar 30 10:25:17 2018 -0700 Committer: Marcelo Vanzin Committed: Fri Mar 30 10:25:17 2018 -0700 -- .../apache/spark/launcher/LauncherBackend.scala | 6 +++--- .../spark/launcher/LauncherServerSuite.java | 20 2 files changed, 3 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f5955aa/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala index 1b049b7..aaae33c 100644 --- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -67,13 +67,13 @@ private[spark] abstract class LauncherBackend { } def setAppId(appId: String): Unit = { -if (connection != null && isConnected) { +if (connection != null) { connection.send(new SetAppId(appId)) } } def setState(state: SparkAppHandle.State): Unit = { -if (connection != null && isConnected && lastState != state) { +if (connection != null && lastState != state) { connection.send(new SetState(state)) lastState = state } @@ -114,10 +114,10 @@ private[spark] abstract class LauncherBackend { override def close(): Unit = { try { -_isConnected = false super.close() } finally { onDisconnected() +_isConnected = false } } http://git-wip-us.apache.org/repos/asf/spark/blob/3f5955aa/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java -- diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index 5413d3a..d16337a 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -185,26 +185,6 @@ public class LauncherServerSuite extends BaseSuite { } } - @Test - public void testAppHandleDisconnect() throws Exception { -LauncherServer server = LauncherServer.getOrCreateServer(); -ChildProcAppHandle handle = new ChildProcAppHandle(server); -String secret = server.registerHandle(handle); - -TestClient client = null; -try { - Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort()); - client = new TestClient(s); - client.send(new Hello(secret, "1.4.0")); - handle.disconnect(); - waitForError(client, secret); -} finally { - handle.kill(); - close(client); - client.clientThread.join(); -} - } - private void close(Closeable c) { if (c != null) { try { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23500][SQL][FOLLOWUP] Fix complex type simplification rules to apply to entire plan
Repository: spark Updated Branches: refs/heads/master 5b5a36ed6 -> bc8d09311 [SPARK-23500][SQL][FOLLOWUP] Fix complex type simplification rules to apply to entire plan ## What changes were proposed in this pull request? This PR is to improve the test coverage of the original PR https://github.com/apache/spark/pull/20687 ## How was this patch tested? N/A Author: gatorsmileCloses #20911 from gatorsmile/addTests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc8d0931 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc8d0931 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc8d0931 Branch: refs/heads/master Commit: bc8d0931170cfa20a4fb64b3b11a2027ddb0d6e9 Parents: 5b5a36e Author: gatorsmile Authored: Fri Mar 30 23:21:07 2018 +0800 Committer: Wenchen Fan Committed: Fri Mar 30 23:21:07 2018 +0800 -- .../catalyst/optimizer/complexTypesSuite.scala | 176 +-- .../apache/spark/sql/ComplexTypesSuite.scala| 109 2 files changed, 233 insertions(+), 52 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc8d0931/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala index e44a669..21ed987 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala @@ -47,10 +47,17 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { SimplifyExtractValueOps) :: Nil } - val idAtt = ('id).long.notNull - val nullableIdAtt = ('nullable_id).long + private val idAtt = ('id).long.notNull + private val nullableIdAtt = ('nullable_id).long - lazy val relation = LocalRelation(idAtt, nullableIdAtt) + private val relation = LocalRelation(idAtt, nullableIdAtt) + private val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.double, 'e.int) + + private def checkRule(originalQuery: LogicalPlan, correctAnswer: LogicalPlan) = { +val optimized = Optimizer.execute(originalQuery.analyze) +assert(optimized.resolved, "optimized plans must be still resolvable") +comparePlans(optimized, correctAnswer.analyze) + } test("explicit get from namedStruct") { val query = relation @@ -58,31 +65,28 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { GetStructField( CreateNamedStruct(Seq("att", 'id )), 0, - None) as "outerAtt").analyze -val expected = relation.select('id as "outerAtt").analyze + None) as "outerAtt") +val expected = relation.select('id as "outerAtt") -comparePlans(Optimizer execute query, expected) +checkRule(query, expected) } test("explicit get from named_struct- expression maintains original deduced alias") { val query = relation .select(GetStructField(CreateNamedStruct(Seq("att", 'id)), 0, None)) - .analyze val expected = relation .select('id as "named_struct(att, id).att") - .analyze -comparePlans(Optimizer execute query, expected) +checkRule(query, expected) } test("collapsed getStructField ontop of namedStruct") { val query = relation .select(CreateNamedStruct(Seq("att", 'id)) as "struct1") .select(GetStructField('struct1, 0, None) as "struct1Att") - .analyze -val expected = relation.select('id as "struct1Att").analyze -comparePlans(Optimizer execute query, expected) +val expected = relation.select('id as "struct1Att") +checkRule(query, expected) } test("collapse multiple CreateNamedStruct/GetStructField pairs") { @@ -94,16 +98,14 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .select( GetStructField('struct1, 0, None) as "struct1Att1", GetStructField('struct1, 1, None) as "struct1Att2") - .analyze val expected = relation. select( 'id as "struct1Att1", ('id * 'id) as "struct1Att2") - .analyze -comparePlans(Optimizer execute query, expected) +checkRule(query, expected) } test("collapsed2 - deduced names") { @@ -115,16 +117,14 @@ class ComplexTypesSuite extends PlanTest with ExpressionEvalHelper { .select( GetStructField('struct1, 0, None), GetStructField('struct1, 1, None)) - .analyze val expected = relation.
svn commit: r26060 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_08_01-5b5a36e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 15:18:34 2018 New Revision: 26060 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_08_01-5b5a36e docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Roll forward "[SPARK-23096][SS] Migrate rate source to V2"
Repository: spark Updated Branches: refs/heads/master b02e76cbf -> 5b5a36ed6 Roll forward "[SPARK-23096][SS] Migrate rate source to V2" ## What changes were proposed in this pull request? Roll forward c68ec4e (#20688). There are two minor test changes required: * An error which used to be TreeNodeException[ArithmeticException] is no longer wrapped and is now just ArithmeticException. * The test framework simply does not set the active Spark session. (Or rather, it doesn't do so early enough - I think it only happens when a query is analyzed.) I've added the required logic to SQLTestUtils. ## How was this patch tested? existing tests Author: Jose TorresAuthor: jerryshao Closes #20922 from jose-torres/ratefix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b5a36ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b5a36ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b5a36ed Branch: refs/heads/master Commit: 5b5a36ed6d2bb0971edfeccddf0f280936d2275f Parents: b02e76c Author: Jose Torres Authored: Fri Mar 30 21:54:26 2018 +0800 Committer: Wenchen Fan Committed: Fri Mar 30 21:54:26 2018 +0800 -- apache.spark.sql.sources.DataSourceRegister | 3 +- .../sql/execution/datasources/DataSource.scala | 6 +- .../streaming/RateSourceProvider.scala | 262 --- .../continuous/ContinuousRateStreamSource.scala | 25 +- .../sources/RateStreamMicroBatchReader.scala| 222 .../streaming/sources/RateStreamProvider.scala | 125 +++ .../streaming/sources/RateStreamSourceV2.scala | 187 --- .../execution/streaming/RateSourceSuite.scala | 194 --- .../execution/streaming/RateSourceV2Suite.scala | 191 --- .../sources/RateStreamProviderSuite.scala | 334 +++ 10 files changed, 705 insertions(+), 844 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a36ed/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister -- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 1fe9c09..1b37905 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider -org.apache.spark.sql.execution.streaming.RateSourceProvider +org.apache.spark.sql.execution.streaming.sources.RateStreamProvider org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider -org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2 http://git-wip-us.apache.org/repos/asf/spark/blob/5b5a36ed/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 31fa89b..b84ea76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider +import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode @@ -566,6 +566,7 @@ object DataSource extends Logging { val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" val nativeOrc = classOf[OrcFileFormat].getCanonicalName val socket = classOf[TextSocketSourceProvider].getCanonicalName +val rate = classOf[RateStreamProvider].getCanonicalName Map( "org.apache.spark.sql.jdbc" -> jdbc, @@ -587,7 +588,8
svn commit: r26056 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_04_01-b02e76c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 11:20:58 2018 New Revision: 26056 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_04_01-b02e76c docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26046 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_30_00_01-df05fb6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Mar 30 07:16:45 2018 New Revision: 26046 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_30_00_01-df05fb6 docs [This commit notification would consist of 1452 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23727][SQL] Support for pushing down filters for DateType in parquet
Repository: spark Updated Branches: refs/heads/master df05fb63a -> b02e76cbf [SPARK-23727][SQL] Support for pushing down filters for DateType in parquet ## What changes were proposed in this pull request? This PR supports for pushing down filters for DateType in parquet ## How was this patch tested? Added UT and tested in local. Author: yucaiCloses #20851 from yucai/SPARK-23727. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b02e76cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b02e76cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b02e76cb Branch: refs/heads/master Commit: b02e76cbffe9e589b7a4e60f91250ca12a4420b2 Parents: df05fb6 Author: yucai Authored: Fri Mar 30 15:07:38 2018 +0800 Committer: Wenchen Fan Committed: Fri Mar 30 15:07:38 2018 +0800 -- .../org/apache/spark/sql/internal/SQLConf.scala | 9 .../datasources/parquet/ParquetFilters.scala| 33 + .../parquet/ParquetFilterSuite.scala| 50 ++-- 3 files changed, 89 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9cb03b5..13f31a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -353,6 +353,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_DATE_ENABLED = buildConf("spark.sql.parquet.filterPushdown.date") +.doc("If true, enables Parquet filter push-down optimization for Date. " + + "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") +.internal() +.booleanConf +.createWithDefault(true) + val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " + "versions, when converting Parquet schema to Spark SQL schema and vice versa.") @@ -1329,6 +1336,8 @@ class SQLConf extends Serializable with Logging { def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) + def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) + def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) http://git-wip-us.apache.org/repos/asf/spark/blob/b02e76cb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 763841e..ccc8306 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -17,10 +17,15 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.sql.Date + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -29,6 +34,10 @@ import org.apache.spark.sql.types._ */ private[parquet] object ParquetFilters { + private def dateToDays(date: Date): SQLDate = { +DateTimeUtils.fromJavaDate(date) + } + private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) @@ -50,6 +59,10 @@ private[parquet] object ParquetFilters { (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) +case DateType if SQLConf.get.parquetFilterPushDownDate => + (n: String, v: Any) => FilterApi.eq( +intColumn(n), +Option(v).map(date =>
spark git commit: [SPARK-23743][SQL] Changed a comparison logic from containing 'slf4j' to starting with 'org.slf4j'
Repository: spark Updated Branches: refs/heads/master b34890119 -> df05fb63a [SPARK-23743][SQL] Changed a comparison logic from containing 'slf4j' to starting with 'org.slf4j' ## What changes were proposed in this pull request? isSharedClass returns if some classes can/should be shared or not. It checks if the classes names have some keywords or start with some names. Following the logic, it can occur unintended behaviors when a custom package has `slf4j` inside the package or class name. As I guess, the first intention seems to figure out the class containing `org.slf4j`. It would be better to change the comparison logic to `name.startsWith("org.slf4j")` ## How was this patch tested? This patch should pass all of the current tests and keep all of the current behaviors. In my case, I'm using ProtobufDeserializer to get a table schema from hive tables. Thus some Protobuf packages and names have `slf4j` inside. Without this patch, it cannot be resolved because of ClassCastException from different classloaders. Author: Jongyoul LeeCloses #20860 from jongyoul/SPARK-23743. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df05fb63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df05fb63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df05fb63 Branch: refs/heads/master Commit: df05fb63abe6018ccbe572c34cf65fc3ecbf1166 Parents: b348901 Author: Jongyoul Lee Authored: Fri Mar 30 14:07:35 2018 +0800 Committer: jerryshao Committed: Fri Mar 30 14:07:35 2018 +0800 -- .../org/apache/spark/sql/hive/client/IsolatedClientLoader.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/df05fb63/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 12975bc..c2690ec 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -179,8 +179,9 @@ private[hive] class IsolatedClientLoader( val isHadoopClass = name.startsWith("org.apache.hadoop.") && !name.startsWith("org.apache.hadoop.hive.") -name.contains("slf4j") || -name.contains("log4j") || +name.startsWith("org.slf4j") || +name.startsWith("org.apache.log4j") || // log4j1.x +name.startsWith("org.apache.logging.log4j") || // log4j2 name.startsWith("org.apache.spark.") || (sharesHadoopClasses && isHadoopClass) || name.startsWith("scala.") || - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org