[GitHub] [spark] gaborgsomogyi commented on a change in pull request #29729: [SPARK-32032][SS] Avoid infinite wait in driver because of KafkaConsumer.poll(long) API

2020-11-30 Thread GitBox


gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r533134611



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -1415,6 +1415,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val USE_DEPRECATED_KAFKA_OFFSET_FETCHING =
+buildConf("spark.sql.streaming.kafka.useDeprecatedOffsetFetching")
+  .internal()
+  .doc("When true, the deprecated Consumer based offset fetching used 
which could cause " +

Review comment:
   Fixed. Updating the description because I've double checked the 
generated HTML files too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 opened a new pull request #30488: [SPARK-33071][SPARK-33536][SQL] Avoid changing dataset_id of LogicalPlan in join() to not break DetectAmbiguousSelfJoin

2020-11-30 Thread GitBox


Ngone51 opened a new pull request #30488:
URL: https://github.com/apache/spark/pull/30488


   
   
   ### What changes were proposed in this pull request?
   
   
   Currently, `join()` uses `withPlan(logicalPlan)` for convenient to call some 
Dataset functions. But it leads to the `dataset_id` inconsistent between the 
`logicalPlan` and the original `Dataset`(because `withPlan(logicalPlan)` will 
create a new Dataset with the new id and reset the `dataset_id` with the new id 
of the `logicalPlan`). As a result, it breaks the rule 
`DetectAmbiguousSelfJoin`.
   
   In this PR, we propose to drop the usage of `withPlan` but use the 
`logicalPlan` directly so its `dataset_id` doesn't change.
   
   Besides, this PR also removes related metadata (`DATASET_ID_KEY`,  
`COL_POS_KEY`) when an `Alias` tries to construct its own metadata. Because the 
`Alias` is no longer a reference column after converting to an `Attribute`.  To 
achieve that, we add a new field, `deniedMetadataKeys`, to indicate the 
metadata that needs to be removed.
   
   
   ### Why are the changes needed?
   
   
   For the query below, it returns the wrong result while it should throws 
ambiguous self join exception instead:
   
   ```scala
   val emp1 = Seq[TestData](
 TestData(1, "sales"),
 TestData(2, "personnel"),
 TestData(3, "develop"),
 TestData(4, "IT")).toDS()
   val emp2 = Seq[TestData](
 TestData(1, "sales"),
 TestData(2, "personnel"),
 TestData(3, "develop")).toDS()
   val emp3 = emp1.join(emp2, emp1("key") === emp2("key")).select(emp1("*"))
   emp1.join(emp3, emp1.col("key") === emp3.col("key"), "left_outer")
 .select(emp1.col("*"), emp3.col("key").as("e2")).show()
   
   // wrong result
   +---+-+---+
   |key|value| e2|
   +---+-+---+
   |  1|sales|  1|
   |  2|personnel|  2|
   |  3|  develop|  3|
   |  4|   IT|  4|
   +---+-+---+
   ```
   This PR fixes the wrong behaviour.
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   Yes, users hit the exception instead of the wrong result after this PR.
   
   
   ### How was this patch tested?
   
   
   Added a new unit test.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 closed pull request #30488: [SPARK-33071][SPARK-33536][SQL] Avoid changing dataset_id of LogicalPlan in join() to not break DetectAmbiguousSelfJoin

2020-11-30 Thread GitBox


Ngone51 closed pull request #30488:
URL: https://github.com/apache/spark/pull/30488


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #30395: [SPARK-32863][SS] Full outer stream-stream join

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #30395:
URL: https://github.com/apache/spark/pull/30395#issuecomment-736282615


   All current reviewers are OK with this, so I'm kicking the Github Action 
build again. Once the test passes I'll merge this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533123647



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   > If `transitive=true` and there are too many dependencies, users 
possibly see a lot of warning messages?
   
   Also `logInfo(s"Added JAR $path at $key with timestamp $timestamp")`
   
   How about like 
   ```
   val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
   if (added.nonEmpty) {
 logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp 
$timestamp")
 postEnvironmentUpdate()
   }
   if(existed.nonEmpty) {
 logWarning(s"The dependencies jars ${existed.mkString(",")} of 
$path has been added" +
   s" already. Overwriting of added jars is not supported in the 
current version.")
   }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #30558: [SPARK-33612][SQL] Add v2SourceRewriteRules batch to Optimizer

2020-11-30 Thread GitBox


aokolnychyi commented on a change in pull request #30558:
URL: https://github.com/apache/spark/pull/30558#discussion_r533123744



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##
@@ -189,6 +189,9 @@ abstract class Optimizer(catalogManager: CatalogManager)
 // plan may contain nodes that do not report stats. Anything that uses 
stats must run after
 // this batch.
 Batch("Early Filter and Projection Push-Down", Once, 
earlyScanPushDownRules: _*) :+
+// This batch rewrites plans for v2 tables. It should be run after the 
operator
+// optimization batch and before any batches that depend on stats.
+Batch("V2 Source Rewrite Rules", Once, v2SourceRewriteRules: _*) :+

Review comment:
   Right now, scan construction and pushdown happens in the same method. We 
could split that later so I agree with moving this batch before early filter 
and projection pushdown.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533123647



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   > If `transitive=true` and there are too many dependencies, users 
possibly see a lot of warning messages?
   
   Also `logInfo(s"Added JAR $path at $key with timestamp $timestamp")`
   
   How about like 
   ```
   val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
   if (added.nonEmpty) {
 logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp 
$timestamp")
 postEnvironmentUpdate()
   }
   if(existed.nonEmpty) {
 logWarning(s"The dependencies jars ${existed.mkString(",")} of 
$path has been added" +
   s" already. Overwriting of added jars is not supported in the 
current version.")
   }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533123647



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1919,18 +1919,24 @@ class SparkContext(config: SparkConf) extends Logging {
   // A JAR file which exists only on the driver node
   case "file" => addLocalJarFile(new File(uri.getPath))
   // A JAR file which exists locally on every worker node
-  case "local" => "file:" + uri.getPath
+  case "local" => Array("file:" + uri.getPath)
+  case "ivy" =>
+// Since `new Path(path).toUri` will lose query information,
+// so here we use `URI.create(path)`
+
DependencyUtils.resolveMavenDependencies(URI.create(path)).split(",")
   case _ => checkRemoteJarFile(path)
 }
   }
-  if (key != null) {
+  if (keys.nonEmpty) {
 val timestamp = if (addedOnSubmit) startTime else 
System.currentTimeMillis
-if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
-  logInfo(s"Added JAR $path at $key with timestamp $timestamp")
-  postEnvironmentUpdate()
-} else {
-  logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +
-"is not supported in the current version.")
+keys.foreach { key =>
+  if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
+logInfo(s"Added JAR $path at $key with timestamp $timestamp")
+postEnvironmentUpdate()
+  } else {
+logWarning(s"The jar $path has been added already. Overwriting of 
added jars " +

Review comment:
   > If `transitive=true` and there are too many dependencies, users 
possibly see a lot of warning messages?
   
   Also `logInfo(s"Added JAR $path at $key with timestamp $timestamp")`
   
   How about like 
   ```
   val (added, existed) = keys.partition(addedJars.putIfAbsent(_, 
timestamp).isEmpty)
   if (added.nonEmpty) {
 logInfo(s"Added JAR $path at ${added.mkString(",")} with timestamp 
$timestamp")
 postEnvironmentUpdate()
   }
   if(existed.nonEmpty) {
 logWarning(s"The dependencies jars ${existed.mkString(",")} of 
$path has been added" +
   s" already. Overwriting of added jars is not supported in the 
current version.")
   }
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #30504: [SPARK-33544][SQL] Optimizer should not insert filter when explode with CreateArray/CreateMap

2020-11-30 Thread GitBox


HyukjinKwon commented on a change in pull request #30504:
URL: https://github.com/apache/spark/pull/30504#discussion_r533123592



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
##
@@ -48,6 +48,9 @@ object ConstantFolding extends Rule[LogicalPlan] {
   // object and running eval unnecessarily.
   case l: Literal => l
 
+  case Size(c: CreateArray, _) => Literal(c.children.length)
+  case Size(c: CreateMap, _) => Literal(c.children.length / 2)

Review comment:
   Hm .. one case I can imagine is the case when the children of 
`CreateArray` or `CreateMap` throw an exception e.g.) from a UDF or when ANSI 
mode is enabled. After this change, looks it's going to suppress such 
exceptions. For example:
   
   ```scala
   spark.range(1).selectExpr("size(array(assert_true(false)))").show()
   ```
   
   As an alternative, we could maybe only allow this when the children of 
`CreateArray` or `CreateMap` are safe and don't throw an exception? e.g.) only 
when they are `AttributeReference`s?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #30312: [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion

2020-11-30 Thread GitBox


otterc commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r533121614



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.net.ConnectException
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark._
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
BlockStoreClient}
+import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.storage._
+
+class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
+  @Mock(answer = RETURNS_SMART_NULLS) private var dependency: 
ShuffleDependency[Int, Int, Int] = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var shuffleClient: 
BlockStoreClient = _
+
+  private var conf: SparkConf = _
+  private var pushedBlocks = new ArrayBuffer[String]
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+conf = new SparkConf(loadDefaults = false)
+MockitoAnnotations.initMocks(this)
+when(dependency.partitioner).thenReturn(new HashPartitioner(8))
+when(dependency.serializer).thenReturn(new JavaSerializer(conf))
+
when(dependency.getMergerLocs).thenReturn(Seq(BlockManagerId("test-client", 
"test-client", 1)))
+conf.set("spark.shuffle.push.based.enabled", "true")
+conf.set("spark.shuffle.service.enabled", "true")
+// Set the env because the shuffler writer gets the shuffle client 
instance from the env.
+val mockEnv = mock(classOf[SparkEnv])
+when(mockEnv.conf).thenReturn(conf)
+when(mockEnv.blockManager).thenReturn(blockManager)
+SparkEnv.set(mockEnv)
+when(blockManager.blockStoreClient).thenReturn(shuffleClient)
+  }
+
+  override def afterEach(): Unit = {
+pushedBlocks.clear()
+super.afterEach()
+  }
+
+  private def interceptPushedBlocksForSuccess(): Unit = {
+when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
+val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
+pushedBlocks ++= blocks
+val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
+val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+  blockFetchListener.onBlockFetchSuccess(blockId, buffer)
+})
+  })
+  }
+
+  test("Basic block push") {
+interceptPushedBlocksForSuccess()
+new TestShuffleBlockPusher(mock(classOf[File]),
+  Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0, 
conf)
+.initiateBlockPush()
+verify(shuffleClient, times(1))
+  .pushBlocks(any(), any(), any(), any(), any())
+assert(pushedBlocks.length == dependency.partitioner.numPartitions)
+ShuffleBlockPusher.stop()
+  }
+
+  test("Large blocks are skipped for push") {
+conf.set("spark.shuffle.push.maxBlockSizeToPush", "1k")
+interceptPushedBlocksForSuccess()
+new TestShuffleBlockPusher(mock(classOf[File]), Array(2, 2, 2, 2, 2, 2, 2, 
1100),
+  dependency, 0, conf).initiateBlockPush()
+verify(shuffleClient, times(1))
+  .pushBlocks(any(), any(), any(), any(), any())
+assert(pushedBlocks.length == dependency.partitioner.numPartitions - 1)
+ShuffleBlockPusher.stop()
+  }
+
+  test("Number of blocks in flight per address are limited by 
maxBlocksInFlightPerAddress") {
+

[GitHub] [spark] otterc commented on a change in pull request #30312: [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion

2020-11-30 Thread GitBox


otterc commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r533120860



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.net.ConnectException
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark._
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
BlockStoreClient}
+import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.storage._
+
+class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
+  @Mock(answer = RETURNS_SMART_NULLS) private var dependency: 
ShuffleDependency[Int, Int, Int] = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var shuffleClient: 
BlockStoreClient = _
+
+  private var conf: SparkConf = _
+  private var pushedBlocks = new ArrayBuffer[String]
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+conf = new SparkConf(loadDefaults = false)
+MockitoAnnotations.initMocks(this)
+when(dependency.partitioner).thenReturn(new HashPartitioner(8))
+when(dependency.serializer).thenReturn(new JavaSerializer(conf))
+
when(dependency.getMergerLocs).thenReturn(Seq(BlockManagerId("test-client", 
"test-client", 1)))
+conf.set("spark.shuffle.push.based.enabled", "true")
+conf.set("spark.shuffle.service.enabled", "true")
+// Set the env because the shuffler writer gets the shuffle client 
instance from the env.
+val mockEnv = mock(classOf[SparkEnv])
+when(mockEnv.conf).thenReturn(conf)
+when(mockEnv.blockManager).thenReturn(blockManager)
+SparkEnv.set(mockEnv)
+when(blockManager.blockStoreClient).thenReturn(shuffleClient)
+  }
+
+  override def afterEach(): Unit = {
+pushedBlocks.clear()
+super.afterEach()
+  }
+
+  private def interceptPushedBlocksForSuccess(): Unit = {
+when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
+val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
+pushedBlocks ++= blocks
+val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
+val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+  blockFetchListener.onBlockFetchSuccess(blockId, buffer)
+})
+  })
+  }
+
+  test("Basic block push") {
+interceptPushedBlocksForSuccess()
+new TestShuffleBlockPusher(mock(classOf[File]),
+  Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0, 
conf)
+.initiateBlockPush()
+verify(shuffleClient, times(1))
+  .pushBlocks(any(), any(), any(), any(), any())

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a change in pull request #30312: [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion

2020-11-30 Thread GitBox


otterc commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r533120443



##
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.net.ConnectException
+import java.nio.ByteBuffer
+import java.util.concurrent.ExecutorService
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv}
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.shuffle.ShuffleBlockPusher._
+import org.apache.spark.storage.{BlockId, BlockManagerId, ShufflePushBlockId}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Used for pushing shuffle blocks to remote shuffle services when push 
shuffle is enabled.
+ * When push shuffle is enabled, it is created after the shuffle writer 
finishes writing the shuffle
+ * file and initiates the block push process.
+ *
+ * @param dataFile mapper generated shuffle data file
+ * @param partitionLengths array of shuffle block size so we can tell shuffle 
block
+ * boundaries within the shuffle file
+ * @param dep  shuffle dependency to get shuffle ID and the 
location of remote shuffle
+ * services to push local shuffle blocks
+ * @param partitionId  map index of the shuffle map task
+ * @param conf spark configuration
+ */
+@Since("3.1.0")
+private[spark] class ShuffleBlockPusher(
+dataFile: File,
+partitionLengths: Array[Long],
+dep: ShuffleDependency[_, _, _],
+partitionId: Int,
+conf: SparkConf) extends Logging {

Review comment:
   I have made the change. Please take a look.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533117100



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean

Review comment:
   > What if `transitive=invalidStr` in hive? Could you add tests?
   
   I change the logical, make it like hive. avoid convert error in `toBoolean`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533116569



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean

Review comment:
   Added in `test("SPARK-33084: Add jar support ivy url")`
   ```
   // Invalid transitive value, will use default value `false`
   
sc.addJar("ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0?transitive=foo")
   assert(!sc.listJars().exists(_.contains("scalajs-library_2.12")))
   assert(sc.listJars().exists(_.contains("scalajs-test-interface_2.12")))
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533115969



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+sc.addJar("file://Test,UDTF.jar")
+assert(!sc.listJars().exists(_.contains("UDTF.jar")))

Review comment:
   > What does this test mean? Why didn't you write it like ` 
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?
   
   Mess in brain...

##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))

Review comment:
   > Cold you add tests for other other schemas?
   
   Updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #30544: [SPARK-32405][SQL][FOLLOWUP] Throw Exception if provider is specified in JDBCTableCatalog create table

2020-11-30 Thread GitBox


cloud-fan commented on a change in pull request #30544:
URL: https://github.com/apache/spark/pull/30544#discussion_r533110907



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
##
@@ -128,6 +128,8 @@ class JDBCTableCatalog extends TableCatalog with Logging {
   case "comment" => tableComment = v
   // ToDo: have a follow up to fail provider once unify create table 
syntax PR is merged

Review comment:
   nit: remove TODO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] waitinfuture commented on pull request #30516: [SPARK-33572][SQL] Datetime building should fail if the year, month, ..., second combination is invalid

2020-11-30 Thread GitBox


waitinfuture commented on pull request #30516:
URL: https://github.com/apache/spark/pull/30516#issuecomment-736253437


   @cloud-fan test passed, ready for merge



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533099833



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+  DependencyUtils.getIvyProperties()
+val authority = uri.getAuthority
+if (authority == null) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found null")
+}
+if (authority.split(":").length != 3) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found " + 
authority)
+}
+
+val uriQuery = uri.getQuery
+val queryParams: Array[(String, String)] = if (uriQuery == null) {
+  Array.empty[(String, String)]
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  mapTokens.map(kv => (kv(0), kv(1)))
+}
+
+resolveMavenDependencies(
+  parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),
+  parseExcludeList(queryParams.filter(_._1.equals("exclude")).map(_._2)),

Review comment:
   Yea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub 

[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533099768



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {

Review comment:
   Yea, thank s for your advise





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on pull request #30532: [MINOR] Spelling sql not core

2020-11-30 Thread GitBox


maropu commented on pull request #30532:
URL: https://github.com/apache/spark/pull/30532#issuecomment-736249327


   Jenkins is down now, so could you re-invoke the GA tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533095376



##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -33,15 +33,30 @@ ADD JAR file_name
 
 * **file_name**
 
-The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system.
+The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system or an ivy URL.
+Apache Ivy is a popular dependency manager focusing on flexibility and 
simplicity. Now we support two parameter in URL query string:
 
+ * transitive: whether to download dependent jars related to your ivy URL.
+ * exclude: exclusion list when download ivy URL jar and dependent jars.
+
+User can write ivy URL such as:
+
+  ivy://group:module:version
+  ivy://group:module:version?transitive=true

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533095212



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {

Review comment:
   Change this 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a change in pull request #30547: [SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

2020-11-30 Thread GitBox


LuciferYang commented on a change in pull request #30547:
URL: https://github.com/apache/spark/pull/30547#discussion_r533094765



##
File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
##
@@ -80,7 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor 
was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = 
sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
+  private val executorTimeoutMs = 
Utils.blockManagerHeartbeatTimeoutAsMs(sc.conf)

Review comment:
   So if use `fallbackConf `, we need to change the name of 
`spark.storage.blockManagerHeartbeatTimeoutMs` to remove the suffix `Ms`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a change in pull request #30547: [SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

2020-11-30 Thread GitBox


LuciferYang commented on a change in pull request #30547:
URL: https://github.com/apache/spark/pull/30547#discussion_r533094765



##
File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
##
@@ -80,7 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor 
was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = 
sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
+  private val executorTimeoutMs = 
Utils.blockManagerHeartbeatTimeoutAsMs(sc.conf)

Review comment:
   So if use `fallbackConf `, we need to change the name of 
`spark.storage.blockManagerHeartbeatTimeoutMs` to remove the suffix "Ms".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a change in pull request #30547: [SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

2020-11-30 Thread GitBox


LuciferYang commented on a change in pull request #30547:
URL: https://github.com/apache/spark/pull/30547#discussion_r533093278



##
File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
##
@@ -80,7 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor 
was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = 
sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
+  private val executorTimeoutMs = 
Utils.blockManagerHeartbeatTimeoutAsMs(sc.conf)

Review comment:
   definition `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` from 
   ```
 private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT =
   ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs")
 .version("0.7.0")
 .withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
 .timeConf(TimeUnit.MILLISECONDS)
 .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
   ```
   to 
   ```
   private[spark] val STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT =
   ConfigBuilder("spark.storage.blockManagerHeartbeatTimeoutMs")
 .version("0.7.0")
 .withAlternative("spark.storage.blockManagerSlaveTimeoutMs")
 .fallbackConf(Network.NETWORK_TIMEOUT)
   ```
   ?
   
   If so, the TimeUnit of `STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT` will change 
from `TimeUnit.MILLISECONDS` to `TimeUnit.SECONDS`, and the default value will 
change from `12` to `120`,
   This does not match the "Ms" unit described in 
`spark.storage.blockManagerHeartbeatTimeoutMs`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533091418



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty

Review comment:
   Yea





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #30212: [SPARK-33308][SQL] Refactor current grouping analytics

2020-11-30 Thread GitBox


maropu commented on a change in pull request #30212:
URL: https://github.com/apache/spark/pull/30212#discussion_r533090836



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -2027,65 +1952,76 @@ class Analyzer(override val catalogManager: 
CatalogManager)
*/
   object ResolveFunctions extends Rule[LogicalPlan] {
 val trimWarningEnabled = new AtomicBoolean(true)
+
+def resolveFunction(): PartialFunction[Expression, Expression] = {
+  case u if !u.childrenResolved => u // Skip until children are resolved.
+  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
+withPosition(u) {
+  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
+}
+  case u @ UnresolvedGenerator(name, children) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(name, children) match {
+case generator: Generator => generator
+case other =>
+  failAnalysis(s"$name is expected to be a generator. However, " +
+s"its class is ${other.getClass.getCanonicalName}, which is 
not a generator.")
+  }
+}
+  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(funcId, arguments) match {
+// AggregateWindowFunctions are AggregateFunctions that can only 
be evaluated within
+// the context of a Window clause. They do not need to be wrapped 
in an
+// AggregateExpression.
+case wf: AggregateWindowFunction =>
+  if (isDistinct || filter.isDefined) {
+failAnalysis("DISTINCT or FILTER specified, " +
+  s"but ${wf.prettyName} is not an aggregate function")
+  } else {
+wf
+  }
+// We get an aggregate function, we need to wrap it in an 
AggregateExpression.
+case agg: AggregateFunction =>
+  if (filter.isDefined && !filter.get.deterministic) {
+failAnalysis("FILTER expression is non-deterministic, " +
+  "it cannot be used in aggregate functions")
+  }
+  AggregateExpression(agg, Complete, isDistinct, filter)
+// This function is not an aggregate function, just return the 
resolved one.
+case other if (isDistinct || filter.isDefined) =>
+  failAnalysis("DISTINCT or FILTER specified, " +
+s"but ${other.prettyName} is not an aggregate function")
+case e: String2TrimExpression if arguments.size == 2 =>
+  if (trimWarningEnabled.get) {
+logWarning("Two-parameter TRIM/LTRIM/RTRIM function signatures 
are deprecated." +
+  " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr 
FROM str)`" +
+  " instead.")
+trimWarningEnabled.set(false)
+  }
+  e
+case other =>
+  other
+  }
+}
+}
+
 def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
   // Resolve functions with concrete relations from v2 catalog.
   case UnresolvedFunc(multipartIdent) =>
 val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent)
 ResolvedFunc(Identifier.of(funcIdent.database.toArray, 
funcIdent.funcName))
 
-  case q: LogicalPlan =>
-q transformExpressions {
-  case u if !u.childrenResolved => u // Skip until children are 
resolved.
-  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
-withPosition(u) {
-  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
-}
-  case u @ UnresolvedGenerator(name, children) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(name, children) match {
-case generator: Generator => generator
-case other =>
-  failAnalysis(s"$name is expected to be a generator. However, 
" +
-s"its class is ${other.getClass.getCanonicalName}, which 
is not a generator.")
-  }
-}
-  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(funcId, arguments) match {
-// AggregateWindowFunctions are AggregateFunctions that can 
only be evaluated within
-// the context of a Window clause. They do not need to be 
wrapped in an
-// AggregateExpression.
-case wf: AggregateWindowFunction =>
-  if (isDistinct || filter.isDefined) {
-failAnalysis("DISTINCT or FILTER specified, " +
-  s"but ${wf.prettyName} is not an 

[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533090454



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1860,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
-def addLocalJarFile(file: File): String = {
+def addLocalJarFile(file: File): Array[String] = {

Review comment:
   Yea

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty
   }
 }
 
-def checkRemoteJarFile(path: String): String = {
+def checkRemoteJarFile(path: String): Array[String] = {

Review comment:
   Yea





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on pull request #29966:
URL: https://github.com/apache/spark/pull/29966#issuecomment-736240432


   > For other reviewers, could you put the screenshot of the updated doc in 
the PR description?
   
   
![image](https://user-images.githubusercontent.com/46485123/100702546-d12ad900-33dc-11eb-90f3-95ad0200c37c.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533088945



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+  DependencyUtils.getIvyProperties()
+val authority = uri.getAuthority
+if (authority == null) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found null")
+}
+if (authority.split(":").length != 3) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found " + 
authority)
+}
+
+val uriQuery = uri.getQuery
+val queryParams: Array[(String, String)] = if (uriQuery == null) {
+  Array.empty[(String, String)]
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  mapTokens.map(kv => (kv(0), kv(1)))
+}
+
+resolveMavenDependencies(
+  parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),

Review comment:
   > We need to respect case-sensitivity for params?
   
   Normally we use lowercase, respect may be good, and hive respect too.





This is an automated message from the Apache Git Service.
To 

[GitHub] [spark] otterc commented on a change in pull request #30312: [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion

2020-11-30 Thread GitBox


otterc commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r533088124



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.net.ConnectException
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark._
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
BlockStoreClient}
+import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.storage._
+
+class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
+  @Mock(answer = RETURNS_SMART_NULLS) private var dependency: 
ShuffleDependency[Int, Int, Int] = _
+  @Mock(answer = RETURNS_SMART_NULLS) private var shuffleClient: 
BlockStoreClient = _
+
+  private var conf: SparkConf = _
+  private var pushedBlocks = new ArrayBuffer[String]
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+conf = new SparkConf(loadDefaults = false)
+MockitoAnnotations.initMocks(this)
+when(dependency.partitioner).thenReturn(new HashPartitioner(8))
+when(dependency.serializer).thenReturn(new JavaSerializer(conf))
+
when(dependency.getMergerLocs).thenReturn(Seq(BlockManagerId("test-client", 
"test-client", 1)))
+conf.set("spark.shuffle.push.based.enabled", "true")
+conf.set("spark.shuffle.service.enabled", "true")
+// Set the env because the shuffler writer gets the shuffle client 
instance from the env.
+val mockEnv = mock(classOf[SparkEnv])
+when(mockEnv.conf).thenReturn(conf)
+when(mockEnv.blockManager).thenReturn(blockManager)
+SparkEnv.set(mockEnv)
+when(blockManager.blockStoreClient).thenReturn(shuffleClient)
+  }
+
+  override def afterEach(): Unit = {
+pushedBlocks.clear()
+super.afterEach()
+  }
+
+  private def interceptPushedBlocksForSuccess(): Unit = {
+when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
+val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
+pushedBlocks ++= blocks
+val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
+val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+  blockFetchListener.onBlockFetchSuccess(blockId, buffer)
+})
+  })
+  }
+
+  test("Basic block push") {
+interceptPushedBlocksForSuccess()
+new TestShuffleBlockPusher(mock(classOf[File]),
+  Array.fill(dependency.partitioner.numPartitions) { 2 }, dependency, 0, 
conf)
+.initiateBlockPush()
+verify(shuffleClient, times(1))
+  .pushBlocks(any(), any(), any(), any(), any())
+assert(pushedBlocks.length == dependency.partitioner.numPartitions)
+ShuffleBlockPusher.stop()
+  }
+
+  test("Large blocks are skipped for push") {
+conf.set("spark.shuffle.push.maxBlockSizeToPush", "1k")
+interceptPushedBlocksForSuccess()
+new TestShuffleBlockPusher(mock(classOf[File]), Array(2, 2, 2, 2, 2, 2, 2, 
1100),
+  dependency, 0, conf).initiateBlockPush()
+verify(shuffleClient, times(1))
+  .pushBlocks(any(), any(), any(), any(), any())
+assert(pushedBlocks.length == dependency.partitioner.numPartitions - 1)
+ShuffleBlockPusher.stop()
+  }
+
+  test("Number of blocks in flight per address are limited by 
maxBlocksInFlightPerAddress") {
+

[GitHub] [spark] AngersZhuuuu commented on a change in pull request #30212: [SPARK-33308][SQL] Refactor current grouping analytics

2020-11-30 Thread GitBox


AngersZh commented on a change in pull request #30212:
URL: https://github.com/apache/spark/pull/30212#discussion_r533087740



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -2027,65 +1952,76 @@ class Analyzer(override val catalogManager: 
CatalogManager)
*/
   object ResolveFunctions extends Rule[LogicalPlan] {
 val trimWarningEnabled = new AtomicBoolean(true)
+
+def resolveFunction(): PartialFunction[Expression, Expression] = {
+  case u if !u.childrenResolved => u // Skip until children are resolved.
+  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
+withPosition(u) {
+  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
+}
+  case u @ UnresolvedGenerator(name, children) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(name, children) match {
+case generator: Generator => generator
+case other =>
+  failAnalysis(s"$name is expected to be a generator. However, " +
+s"its class is ${other.getClass.getCanonicalName}, which is 
not a generator.")
+  }
+}
+  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(funcId, arguments) match {
+// AggregateWindowFunctions are AggregateFunctions that can only 
be evaluated within
+// the context of a Window clause. They do not need to be wrapped 
in an
+// AggregateExpression.
+case wf: AggregateWindowFunction =>
+  if (isDistinct || filter.isDefined) {
+failAnalysis("DISTINCT or FILTER specified, " +
+  s"but ${wf.prettyName} is not an aggregate function")
+  } else {
+wf
+  }
+// We get an aggregate function, we need to wrap it in an 
AggregateExpression.
+case agg: AggregateFunction =>
+  if (filter.isDefined && !filter.get.deterministic) {
+failAnalysis("FILTER expression is non-deterministic, " +
+  "it cannot be used in aggregate functions")
+  }
+  AggregateExpression(agg, Complete, isDistinct, filter)
+// This function is not an aggregate function, just return the 
resolved one.
+case other if (isDistinct || filter.isDefined) =>
+  failAnalysis("DISTINCT or FILTER specified, " +
+s"but ${other.prettyName} is not an aggregate function")
+case e: String2TrimExpression if arguments.size == 2 =>
+  if (trimWarningEnabled.get) {
+logWarning("Two-parameter TRIM/LTRIM/RTRIM function signatures 
are deprecated." +
+  " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr 
FROM str)`" +
+  " instead.")
+trimWarningEnabled.set(false)
+  }
+  e
+case other =>
+  other
+  }
+}
+}
+
 def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
   // Resolve functions with concrete relations from v2 catalog.
   case UnresolvedFunc(multipartIdent) =>
 val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent)
 ResolvedFunc(Identifier.of(funcIdent.database.toArray, 
funcIdent.funcName))
 
-  case q: LogicalPlan =>
-q transformExpressions {
-  case u if !u.childrenResolved => u // Skip until children are 
resolved.
-  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
-withPosition(u) {
-  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
-}
-  case u @ UnresolvedGenerator(name, children) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(name, children) match {
-case generator: Generator => generator
-case other =>
-  failAnalysis(s"$name is expected to be a generator. However, 
" +
-s"its class is ${other.getClass.getCanonicalName}, which 
is not a generator.")
-  }
-}
-  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(funcId, arguments) match {
-// AggregateWindowFunctions are AggregateFunctions that can 
only be evaluated within
-// the context of a Window clause. They do not need to be 
wrapped in an
-// AggregateExpression.
-case wf: AggregateWindowFunction =>
-  if (isDistinct || filter.isDefined) {
-failAnalysis("DISTINCT or FILTER specified, " +
-  s"but ${wf.prettyName} is not an 

[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533087617



##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean
+}
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *  `ivy://group:module:version[?query]`
+   *Ivy URI query part format should be:
+   *  `parameter=value=value...`
+   *Note that currently ivy URI query part support two parameters:
+   * 1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *transitive=false or `transitive=true`, if not set, the 
default value is false.
+   * 2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *`exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): String = {
+val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+  DependencyUtils.getIvyProperties()
+val authority = uri.getAuthority
+if (authority == null) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found null")
+}
+if (authority.split(":").length != 3) {
+  throw new URISyntaxException(
+authority, "Invalid url: Expected 'org:module:version', found " + 
authority)
+}
+
+val uriQuery = uri.getQuery
+val queryParams: Array[(String, String)] = if (uriQuery == null) {
+  Array.empty[(String, String)]
+} else {
+  val mapTokens = uriQuery.split("&").map(_.split("="))
+  if (mapTokens.exists(_.length != 2)) {
+throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+  }
+  mapTokens.map(kv => (kv(0), kv(1)))
+}
+
+resolveMavenDependencies(
+  parseTransitive(queryParams.filter(_._1.equals("transitive")).map(_._2)),

Review comment:
   We need to respect case-sensitivity for params?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

[GitHub] [spark] maropu commented on pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on pull request #29966:
URL: https://github.com/apache/spark/pull/29966#issuecomment-736236902


   For other reviewers, could you put the screenshot of the updated doc in the 
PR description?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533086750



##
File path: docs/sql-ref-syntax-aux-resource-mgmt-add-jar.md
##
@@ -33,15 +33,30 @@ ADD JAR file_name
 
 * **file_name**
 
-The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system.
+The name of the JAR file to be added. It could be either on a local file 
system or a distributed file system or an ivy URL.
+Apache Ivy is a popular dependency manager focusing on flexibility and 
simplicity. Now we support two parameter in URL query string:
 
+ * transitive: whether to download dependent jars related to your ivy URL.
+ * exclude: exclusion list when download ivy URL jar and dependent jars.
+
+User can write ivy URL such as:
+
+  ivy://group:module:version
+  ivy://group:module:version?transitive=true

Review comment:
   `transitive=true` -> `transitive=[true|false]`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


HeartSaVioR closed pull request #28363:
URL: https://github.com/apache/spark/pull/28363


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #28363:
URL: https://github.com/apache/spark/pull/28363#issuecomment-736234860


   Thanks all for the thoughtful reviews! Merging to master.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533081191



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+sc.addJar("file://Test,UDTF.jar")
+assert(!sc.listJars().exists(_.contains("UDTF.jar")))

Review comment:
   What does this test mean? Why didn't you write it like ` 
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533081191



##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))
+sc.addJar("file://Test,UDTF.jar")
+assert(!sc.listJars().exists(_.contains("UDTF.jar")))

Review comment:
   What does this test mean? Why didn't you do ` 
assert(sc.listJars().exists(_.contains("Test,UDTF.jar")))`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #29966: [SPARK-33084][CORE][SQL] Add jar support ivy path

2020-11-30 Thread GitBox


maropu commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533076491



##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1860,7 +1860,7 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   private def addJar(path: String, addedOnSubmit: Boolean): Unit = {
-def addLocalJarFile(file: File): String = {
+def addLocalJarFile(file: File): Array[String] = {

Review comment:
   `Array` -> `Seq`

##
File path: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
##
@@ -366,6 +366,12 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 }
   }
 
+  test("add jar local path with comma") {
+sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local"))

Review comment:
   Cold you add tests for other other schemas?

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty
   }
 }
 
-def checkRemoteJarFile(path: String): String = {
+def checkRemoteJarFile(path: String): Array[String] = {

Review comment:
   ditto

##
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##
@@ -15,22 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+Seq(
+  "spark.jars.excludes",
+  "spark.jars.packages",
+  "spark.jars.repositories",
+  "spark.jars.ivy",
+  "spark.jars.ivySettings"
+).map(sys.props.get(_).orNull)
+  }
+
+  /**
+   * Parse excluded list in ivy URL. When download ivy URL jar, Spark won't 
download transitive jar
+   * in excluded list.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of exclude.
+   * Example: Input:  
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http
+   * Output:  [org.mortbay.jetty:jetty, org.eclipse.jetty:jetty-http]
+   */
+  private def parseExcludeList(excludes: Array[String]): String = {
+excludes.flatMap { excludeString =>
+  val excludes: Array[String] = excludeString.split(",")
+  if (excludes.exists(_.split(":").length != 2)) {
+throw new URISyntaxException(excludeString,
+  "Invalid exclude string: expected 'org:module,org:module,..', found 
" + excludeString)
+  }
+  excludes
+}.mkString(":")
+  }
+
+  /**
+   * Parse transitive parameter in ivy URL, default value is false.
+   *
+   * @param queryString Ivy URI query part string.
+   * @return Exclude list which contains grape parameters of transitive.
+   * Example: Input:  exclude=org.mortbay.jetty:jetty=true
+   * Output:  true
+   */
+  private def parseTransitive(transitives: Array[String]): Boolean = {
+if (transitives.isEmpty) {
+  false
+} else {
+  if (transitives.length > 1) {
+logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+  " If there are multiple `transitive` parameter, we will select the 
last one")
+  }
+  transitives.last.toBoolean

Review comment:
   What if `transitive=invalidStr` in hive? Could you add tests?

##
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##
@@ -1869,15 +1869,15 @@ class SparkContext(config: SparkConf) extends Logging {
   throw new IllegalArgumentException(
 s"Directory ${file.getAbsoluteFile} is not allowed for addJar")
 }
-env.rpcEnv.fileServer.addJar(file)
+Array(env.rpcEnv.fileServer.addJar(file))
   } catch {
 case NonFatal(e) =>
   logError(s"Failed to add $path to Spark environment", e)
-  null
+  Array.empty

Review comment:
   Then, `Nil`

##
File path: 

[GitHub] [spark] zsxwing commented on pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


zsxwing commented on pull request #28363:
URL: https://github.com/apache/spark/pull/28363#issuecomment-736230794


   LGTM. Thanks for your patience.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #30547: [SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

2020-11-30 Thread GitBox


HyukjinKwon commented on a change in pull request #30547:
URL: https://github.com/apache/spark/pull/30547#discussion_r533081349



##
File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
##
@@ -80,7 +80,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor 
was received
   private val executorLastSeen = new HashMap[String, Long]
 
-  private val executorTimeoutMs = 
sc.conf.get(config.STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT)
+  private val executorTimeoutMs = 
Utils.blockManagerHeartbeatTimeoutAsMs(sc.conf)

Review comment:
   Can you use `fallbackConf` at the definition of this configuration at 
`config/package.scala`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30547: [SPARK-33557][CORE][MESOS][TEST] Ensure the relationship between STORAGE_BLOCKMANAGER_HEARTBEAT_TIMEOUT and NETWORK_TIMEOUT

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30547:
URL: https://github.com/apache/spark/pull/30547#issuecomment-736225520


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon edited a comment on pull request #30486: [SPARK-33530][CORE] Support --archives and spark.archives option natively

2020-11-30 Thread GitBox


HyukjinKwon edited a comment on pull request #30486:
URL: https://github.com/apache/spark/pull/30486#issuecomment-736214769


   Oh, maybe I will use tar.gz and tgz in the integration test. That will 
address https://github.com/apache/spark/pull/30486#discussion_r532678330 
together.
   
   I filed a JIRA at SPARK-33615



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30486: [SPARK-33530][CORE] Support --archives and spark.archives option natively

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30486:
URL: https://github.com/apache/spark/pull/30486#issuecomment-736214769


   Oh, maybe I will use tar.gz and tgz in the integration test. That will 
address https://github.com/apache/spark/pull/30486#discussion_r532678330 
togeter.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #30486: [SPARK-33530][CORE] Support --archives and spark.archives option natively

2020-11-30 Thread GitBox


HyukjinKwon closed pull request #30486:
URL: https://github.com/apache/spark/pull/30486


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30486: [SPARK-33530][CORE] Support --archives and spark.archives option natively

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30486:
URL: https://github.com/apache/spark/pull/30486#issuecomment-736213589


   Thanks all @dongjoon-hyun @maropu @Ngone51 @mridulm and @tgravescs. Let me 
merge this in.
   I will try to have some time to prepare an IT test with K8S which hopefully 
will be added before Spark 3.1.0 release.
   
   Merged to master.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #30433: [SPARK-32916][SHUFFLE][test-maven][test-hadoop2.7] Ensure the number of chunks in meta file and index file are equal

2020-11-30 Thread GitBox


mridulm commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r533068912



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##
@@ -827,13 +833,16 @@ void resetChunkTracker() {
 void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
   long idxStartPos = -1;
   try {
-// update the chunk tracker to meta file before index file
-writeChunkTracker(mapIndex);
 idxStartPos = indexFile.getFilePointer();
 logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
   appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
   chunkOffset);
-indexFile.writeLong(chunkOffset);
+indexFile.write(Longs.toByteArray(chunkOffset));
+// Chunk bitmap should be written to the meta file after the index 
file because if there are
+// any exceptions during writing the offset to the index file, meta 
file should not be
+// updated. If the update to the index file is successful but the 
update to meta file isn't
+// then the index file position is reset in the catch clause.
+writeChunkTracker(mapIndex);

Review comment:
   Thanks for confirming @otterc, this means we should not have seek 
throwing exception in almost all cases for us.
   We have two paths here:
   a) Ignore seek exception as unexpected.
   b) Abort merge, assuming EBADF/etc - as it is unrecoverable error.
   
   I am partial towards (b) as we cannot recover with seek exception. Thoughts ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a change in pull request #30433: [SPARK-32916][SHUFFLE][test-maven][test-hadoop2.7] Ensure the number of chunks in meta file and index file are equal

2020-11-30 Thread GitBox


mridulm commented on a change in pull request #30433:
URL: https://github.com/apache/spark/pull/30433#discussion_r533068912



##
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##
@@ -827,13 +833,16 @@ void resetChunkTracker() {
 void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
   long idxStartPos = -1;
   try {
-// update the chunk tracker to meta file before index file
-writeChunkTracker(mapIndex);
 idxStartPos = indexFile.getFilePointer();
 logger.trace("{} shuffleId {} reduceId {} updated index current {} 
updated {}",
   appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset,
   chunkOffset);
-indexFile.writeLong(chunkOffset);
+indexFile.write(Longs.toByteArray(chunkOffset));
+// Chunk bitmap should be written to the meta file after the index 
file because if there are
+// any exceptions during writing the offset to the index file, meta 
file should not be
+// updated. If the update to the index file is successful but the 
update to meta file isn't
+// then the index file position is reset in the catch clause.
+writeChunkTracker(mapIndex);

Review comment:
   Thanks for confirming @otterc.
   We have two paths here:
   a) Ignore seek exception as unexpected.
   b) Abort merge, assuming EBADF/etc - as it is unrecoverable error.
   
   I am partial towards (b) as we cannot recover with seek exception. Thoughts ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #30533: [SPARK-33380] [EXAMPLES] New python example (series-pi.py) to calculate pi via series

2020-11-30 Thread GitBox


HyukjinKwon closed pull request #30533:
URL: https://github.com/apache/spark/pull/30533


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon edited a comment on pull request #30554: [SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE command

2020-11-30 Thread GitBox


HyukjinKwon edited a comment on pull request #30554:
URL: https://github.com/apache/spark/pull/30554#issuecomment-736208612


   I agree with the point of describing the things and explaining 
rationalization. I would have imagined to have a detailed PR description as 
well.
   
   Enough discussion should better be made for a significant change of course. 
I don't believe this PR targeted to push without it. Also, I don't believe such 
discussions should happen in a specific place like mailing list. They can 
happen in JIRA, PR, etc.
   
   One alternative is just to turn this switch off by default, issue warnings 
and turn it on later in another release in the future.
   
   Just to make it extra clear, what this PR changes looks correct and 
straightforward to me. Spark should create a Spark table when users `CREATE 
TABLE`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


AngersZh commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736210125


   All right, there is still a lot of work to be done to integrate these engine 
with hadoop. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30554: [SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE command

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30554:
URL: https://github.com/apache/spark/pull/30554#issuecomment-736208612


   I agree with the point of describing the things and explaining 
rationalization. I would have imagined to have a detailed PR description as 
well.
   
   Enough discussion should better be made for a significant change of course. 
I don't believe this PR targeted to push without it - I don't believe such 
discussions should happen in a specific place like mailing list. They can 
happen in JIRA, PR, etc.
   
   One alternative is just to turn this switch off by default, issue warnings 
and turn it on later in another release in the future.
   
   Just to make it extra clear, what this PR changes looks correct and 
straightforward to me. Spark should create a Spark table when users `CREATE 
TABLE`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #27649: [SPARK-30900][SS] FileStreamSource: Avoid reading compact metadata log twice if the query restarts from compact batch

2020-11-30 Thread GitBox


HeartSaVioR closed pull request #27649:
URL: https://github.com/apache/spark/pull/27649


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #27649: [SPARK-30900][SS] FileStreamSource: Avoid reading compact metadata log twice if the query restarts from compact batch

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #27649:
URL: https://github.com/apache/spark/pull/27649#issuecomment-736205169


   GA passed. Merging to master.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] luluorta commented on a change in pull request #29882: [SPARK-33008][SQL] Division by zero on divide-like operations returns incorrect result

2020-11-30 Thread GitBox


luluorta commented on a change in pull request #29882:
URL: https://github.com/apache/spark/pull/29882#discussion_r533058089



##
File path: 
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
##
@@ -49,6 +49,7 @@ SELECT 1 AS one FROM test_having HAVING a > 1;
 SELECT 1 AS one FROM test_having HAVING 1 > 2;
 SELECT 1 AS one FROM test_having HAVING 1 < 2;
 
+-- [SPARK-33008] Spark SQL throws an exception

Review comment:
   I found that postgres returns correct results without errors for both `1 
< 2` and `1 > 2`. While Spark SQL always scans the table and throws exceptions 
cause its optimizer does not take this case into account.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30421: [SPARK-33474][SQL] Support TypeConstructed partition spec value

2020-11-30 Thread GitBox


AngersZh commented on pull request #30421:
URL: https://github.com/apache/spark/pull/30421#issuecomment-736198521


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #30374: [WIP][SPARK-33444][ML] Added support for Initial model in Gaussian Mixture Model in ML

2020-11-30 Thread GitBox


zhengruifeng commented on pull request #30374:
URL: https://github.com/apache/spark/pull/30374#issuecomment-736196878


   in the .ml side, models do not support initialization for now. I think we 
need more discussion and design a general method to support them.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


sunchao commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736196419


   Ah ok. That part, as far as I know, is stuck because Hive has dependency on 
a old version of Spark which blocks it from upgrading Guava. Hopefully that 
should be unblocked after 
[HADOOP-17288](https://issues.apache.org/jira/browse/HADOOP-17288) is shipped 
in the upcoming release (whether that be 3.3.1 or 3.4.0). I'm not sure whether 
the change will go to Hive 2.3 branch though.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #28363:
URL: https://github.com/apache/spark/pull/28363#issuecomment-736195621


   Thanks for the detailed review. Just applied both.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


AngersZh commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736194591


   > @AngersZh you mean for Spark to work with Hive and Hadoop 3.3.0, 
right? the major issue is around resolving potential Guava conflicts between 
these components. Hadoop 3.2.1+/3.3.0+ has moved to Guava 27 while Hive/Spark 
are still on Guava 14. One of the motivations to move to the shaded client in 
Spark is to isolate the Guava dependencies on the Hadoop side. Similarly, 
@viirya is working on the above PR to shade Guava from Hive side.
   
   Not only spark, we need hive can support running with hadoop-3.3.0 too. 
Seems doesn't look finished yet.
   https://issues.apache.org/jira/browse/HIVE-21569
   https://issues.apache.org/jira/browse/HIVE-22916



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zsxwing commented on pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


zsxwing commented on pull request #28363:
URL: https://github.com/apache/spark/pull/28363#issuecomment-736193634


   > TTL is defined via current timestamp - commit time (the time 
`ManifestFileCommitProtocol.commitJob` is called to write streaming file sink 
metadata log).
   
   Could you update this in the PR description? We are using the file 
modification time now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


sunchao commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736193307


   @AngersZh you mean for Spark to work with Hive and Hadoop 3.3.0, right? 
the major issue is around resolving potential Guava conflicts between these 
components. Hadoop 3.2.1+/3.3.0+ has moved to Guava 27 while Hive/Spark are 
still on Guava 14. One of the motivations to move to the shaded client in Spark 
is to isolate the Guava dependencies on the Hadoop side. Similarly, @viirya is 
working on the above PR to shade Guava from Hive side.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zsxwing commented on a change in pull request #28363: [SPARK-27188][SS] FileStreamSink: provide a new option to have retention on output files

2020-11-30 Thread GitBox


zsxwing commented on a change in pull request #28363:
URL: https://github.com/apache/spark/pull/28363#discussion_r533048816



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##
@@ -258,7 +259,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
 try {
   val logs =
 getAllValidBatches(latestId, compactInterval).flatMap { id =>
-  filterInBatch(id)(shouldRetain).getOrElse {
+  val curTime = System.currentTimeMillis()

Review comment:
   nit: we can move this out of the `flatMap` function.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao closed pull request #29581: [SPARK-32523][SQL] Override alter table in JDBC dialects

2020-11-30 Thread GitBox


huaxingao closed pull request #29581:
URL: https://github.com/apache/spark/pull/29581


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maropu commented on a change in pull request #30212: [SPARK-33308][SQL] Refactor current grouping analytics

2020-11-30 Thread GitBox


maropu commented on a change in pull request #30212:
URL: https://github.com/apache/spark/pull/30212#discussion_r533046341



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##
@@ -2027,65 +1952,76 @@ class Analyzer(override val catalogManager: 
CatalogManager)
*/
   object ResolveFunctions extends Rule[LogicalPlan] {
 val trimWarningEnabled = new AtomicBoolean(true)
+
+def resolveFunction(): PartialFunction[Expression, Expression] = {
+  case u if !u.childrenResolved => u // Skip until children are resolved.
+  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
+withPosition(u) {
+  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
+}
+  case u @ UnresolvedGenerator(name, children) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(name, children) match {
+case generator: Generator => generator
+case other =>
+  failAnalysis(s"$name is expected to be a generator. However, " +
+s"its class is ${other.getClass.getCanonicalName}, which is 
not a generator.")
+  }
+}
+  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
+withPosition(u) {
+  v1SessionCatalog.lookupFunction(funcId, arguments) match {
+// AggregateWindowFunctions are AggregateFunctions that can only 
be evaluated within
+// the context of a Window clause. They do not need to be wrapped 
in an
+// AggregateExpression.
+case wf: AggregateWindowFunction =>
+  if (isDistinct || filter.isDefined) {
+failAnalysis("DISTINCT or FILTER specified, " +
+  s"but ${wf.prettyName} is not an aggregate function")
+  } else {
+wf
+  }
+// We get an aggregate function, we need to wrap it in an 
AggregateExpression.
+case agg: AggregateFunction =>
+  if (filter.isDefined && !filter.get.deterministic) {
+failAnalysis("FILTER expression is non-deterministic, " +
+  "it cannot be used in aggregate functions")
+  }
+  AggregateExpression(agg, Complete, isDistinct, filter)
+// This function is not an aggregate function, just return the 
resolved one.
+case other if (isDistinct || filter.isDefined) =>
+  failAnalysis("DISTINCT or FILTER specified, " +
+s"but ${other.prettyName} is not an aggregate function")
+case e: String2TrimExpression if arguments.size == 2 =>
+  if (trimWarningEnabled.get) {
+logWarning("Two-parameter TRIM/LTRIM/RTRIM function signatures 
are deprecated." +
+  " Use SQL syntax `TRIM((BOTH | LEADING | TRAILING)? trimStr 
FROM str)`" +
+  " instead.")
+trimWarningEnabled.set(false)
+  }
+  e
+case other =>
+  other
+  }
+}
+}
+
 def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
   // Resolve functions with concrete relations from v2 catalog.
   case UnresolvedFunc(multipartIdent) =>
 val funcIdent = parseSessionCatalogFunctionIdentifier(multipartIdent)
 ResolvedFunc(Identifier.of(funcIdent.database.toArray, 
funcIdent.funcName))
 
-  case q: LogicalPlan =>
-q transformExpressions {
-  case u if !u.childrenResolved => u // Skip until children are 
resolved.
-  case u: UnresolvedAttribute if resolver(u.name, 
VirtualColumn.hiveGroupingIdName) =>
-withPosition(u) {
-  Alias(GroupingID(Nil), VirtualColumn.hiveGroupingIdName)()
-}
-  case u @ UnresolvedGenerator(name, children) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(name, children) match {
-case generator: Generator => generator
-case other =>
-  failAnalysis(s"$name is expected to be a generator. However, 
" +
-s"its class is ${other.getClass.getCanonicalName}, which 
is not a generator.")
-  }
-}
-  case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
-withPosition(u) {
-  v1SessionCatalog.lookupFunction(funcId, arguments) match {
-// AggregateWindowFunctions are AggregateFunctions that can 
only be evaluated within
-// the context of a Window clause. They do not need to be 
wrapped in an
-// AggregateExpression.
-case wf: AggregateWindowFunction =>
-  if (isDistinct || filter.isDefined) {
-failAnalysis("DISTINCT or FILTER specified, " +
-  s"but ${wf.prettyName} is not an 

[GitHub] [spark] AngersZhuuuu commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


AngersZh commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736189690


   > @AngersZh yes that should work. On the other hand, if you are building 
Spark with `hadoop-provided` option on, you can also build your own shaded 
`hadoop-aws` jar and put it in the classpath.
   
   Thanks for your suggestion.  Is there any new developments in Hive 2.3 
support for Hadoop-3.3.0? I saw this pr 
https://github.com/apache/hive/pull/1356  but I am not sure it will make hive 
run well with hadoop-3.3.0



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


sunchao commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736188374


   @AngersZh yes that should work. On the other hand, if you are building 
Spark with `hadoop-provided` option on, you can also build your own shaded 
`hadoop-aws` jar and put it in the classpath. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #30521: [SPARK-33577][SS] Add support for V1Table in stream writer table API

2020-11-30 Thread GitBox


HeartSaVioR commented on a change in pull request #30521:
URL: https://github.com/apache/spark/pull/30521#discussion_r533043551



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
##
@@ -304,46 +308,68 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
-this.source = SOURCE_NAME_TABLE
+  def table(tableName: String): StreamingQuery = {

Review comment:
   Again I don't just say about the name. I say about the usage pattern - 
at least for DataStreamWriter, for years `start()` is the only trigger entry 
starting the query. Isn't it enough reason to have DataStreamWriterV2 instead 
of breaking the pattern? Once we have `df.writeStreamTo("table")` everything is 
in sync with DataFrameWriterV2 (`df.writeTo("table")`) and no confusion.
   
   If we really insist to have two different trigger entries I'm OK to go with 
`toTable` but not just simply `table`, because it doesn't say intuitively that 
the method is a terminal operation. We'll have to rely on return type to see 
whether it's returning StreamingQuery instance - JVM languages will get blessed 
by IDE, but I'm not quite sure IDE will guide for all supported non-JVM 
languages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #30521: [SPARK-33577][SS] Add support for V1Table in stream writer table API

2020-11-30 Thread GitBox


HeartSaVioR commented on a change in pull request #30521:
URL: https://github.com/apache/spark/pull/30521#discussion_r533043551



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
##
@@ -304,46 +308,68 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
-this.source = SOURCE_NAME_TABLE
+  def table(tableName: String): StreamingQuery = {

Review comment:
   Again I don't just say about the name. I say about the usage pattern - 
at least for DataStreamWriter, for years `start()` is the only trigger entry 
starting the query. Isn't it enough reason to have DataStreamWriterV2 instead 
of breaking the pattern? Once we have `df.writeStreamTo("table")` everything is 
in sync with DataFrameWriterV2 (`df.writeTo("table")`) and no confusion.
   
   If we really insist to have two different trigger entries I'm OK to go with 
`toTable` but not just simply `table`, because it doesn't say intuitively that 
the method is a terminal operation. We'll have to rely on return type to see 
whether it's returning StreamingQuery instance - JVM languages will get blessed 
by IDE, but I'm not quite sure IDE won't guide for all supported non-JVM 
languages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #30521: [SPARK-33577][SS] Add support for V1Table in stream writer table API

2020-11-30 Thread GitBox


HeartSaVioR commented on a change in pull request #30521:
URL: https://github.com/apache/spark/pull/30521#discussion_r533043551



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
##
@@ -304,46 +308,68 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
-this.source = SOURCE_NAME_TABLE
+  def table(tableName: String): StreamingQuery = {

Review comment:
   Again I don't just say about the name. I say about the usage pattern - 
at least for DataStreamWriter, for years `start()` is the only trigger entry 
starting the query. Isn't it enough reason to have DataStreamWriterV2 for 
breaking the pattern? Once we have `df.writeStreamTo("table")` everything is in 
sync with DataFrameWriterV2 (`df.writeTo("table")`) and no confusion.
   
   If we really insist to have two different trigger entries I'm OK to go with 
`toTable` but not just simply `table`, because it doesn't say intuitively that 
the method is a terminal operation. We'll have to rely on return type to see 
whether it's returning StreamingQuery instance - JVM languages will get blessed 
by IDE, but I'm not quite sure IDE won't guide for all supported non-JVM 
languages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR closed pull request #30546: [SPARK-33607][SS][WEBUI] Input Rate timeline/histogram aren't rendered if built with Scala 2.13

2020-11-30 Thread GitBox


HeartSaVioR closed pull request #30546:
URL: https://github.com/apache/spark/pull/30546


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #30546: [SPARK-33607][SS][WEBUI] Input Rate timeline/histogram aren't rendered if built with Scala 2.13

2020-11-30 Thread GitBox


HeartSaVioR edited a comment on pull request #30546:
URL: https://github.com/apache/spark/pull/30546#issuecomment-736180446


   This is a single line change, and both Jenkins and GA passed. Merging to 
master. (I'll skip porting back as Scala 2.13 compatible doesn't look to be a 
target for 3.0.x.)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #30546: [SPARK-33607][SS][WEBUI] Input Rate timeline/histogram aren't rendered if built with Scala 2.13

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #30546:
URL: https://github.com/apache/spark/pull/30546#issuecomment-736180446


   This is a single line change, and both Jenkins and GA passed. Merging to 
master (as Scala 2.13 compatible doesn't look to be a target for 3.0.x)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] luluorta commented on a change in pull request #29882: [SPARK-33008][SQL] Division by zero on divide-like operations returns incorrect result

2020-11-30 Thread GitBox


luluorta commented on a change in pull request #29882:
URL: https://github.com/apache/spark/pull/29882#discussion_r533036738



##
File path: 
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
##
@@ -49,6 +49,7 @@ SELECT 1 AS one FROM test_having HAVING a > 1;
 SELECT 1 AS one FROM test_having HAVING 1 > 2;
 SELECT 1 AS one FROM test_having HAVING 1 < 2;
 
+-- [SPARK-33008] Spark SQL throws an exception

Review comment:
   IIUC, the `HAVING` condition here is performed on the globally grouped 
result set. I think this query is to prove the row count of the global group 
does not affect the output of an always-true `HAVING` clause.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] brkyvz commented on a change in pull request #30521: [SPARK-33577][SS] Add support for V1Table in stream writer table API

2020-11-30 Thread GitBox


brkyvz commented on a change in pull request #30521:
URL: https://github.com/apache/spark/pull/30521#discussion_r533036448



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
##
@@ -304,46 +308,68 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
* @since 3.1.0
*/
   @throws[TimeoutException]
-  def saveAsTable(tableName: String): StreamingQuery = {
-this.source = SOURCE_NAME_TABLE
+  def table(tableName: String): StreamingQuery = {

Review comment:
   I prefer something like `table`, because:
   ```
   df.writeStream.table("table_name")
   ```
   seems natural to me (though it is missing a `to` to make total sense. 
`toTable` also makes sense, but then just `table` is shorter. I wouldn't want 
to use `saveAsTable`, because it's not saving really, it's starting a stream 
that's continuously saving/updating/overwriting - and the action depends on the 
`outputMode`.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR edited a comment on pull request #30554: [SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE command

2020-11-30 Thread GitBox


HeartSaVioR edited a comment on pull request #30554:
URL: https://github.com/apache/spark/pull/30554#issuecomment-736177445


   I'd agree to change it even in minor release if we have discussed this for 
enough time in public, and put some efforts on figuring out impacts to the end 
users and guide in prior (like roadmap).
   
   I don't think we did anything I mentioned. The last discussion we did before 
Spark 3.0.0 was more likely concerning the change without proper discussion, 
and we reverted it. Unifying create table syntax fixes the long term issue 
along confused two create table syntaxes, but that's it and it's not a 
rationalization of changing the default provider.
   
   Changing the default provider for create table is totally different story. 
My experience of Spark community says that we're most likely reluctant to make 
a backward incompatible change (even we did for major release), and sometimes 
we set old behavior by default even the new functionality is available. I'm 
surprised PR description doesn't mention anything about impacts.
   
   I agree this requires enough discussion in public before going further. In 
discussion we should make clear the benefits of changing this, "AND" the all 
possible impacts of changing this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gatorsmile commented on pull request #30555: [SPARK-33608][SQL] Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates

2020-11-30 Thread GitBox


gatorsmile commented on pull request #30555:
URL: https://github.com/apache/spark/pull/30555#issuecomment-736178529


   cc @dilipbiswal 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gatorsmile commented on a change in pull request #30555: [SPARK-33608][SQL] Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates

2020-11-30 Thread GitBox


gatorsmile commented on a change in pull request #30555:
URL: https://github.com/apache/spark/pull/30555#discussion_r533034446



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
##
@@ -328,6 +328,8 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] 
with PredicateHelper
 // Only a few unary nodes (Project/Filter/Aggregate) can contain 
subqueries.
 case q: UnaryNode =>
   rewriteSubQueries(q, q.children)
+case s: SupportsSubquery =>

Review comment:
   Add a comment above this line? To be honest, it is hard to tell that 
this trait means UPDATE/MERGE/DELETE. 
   
   Also, I think this change is just part of the whole changes for supporting 
the subquery in UPDATE/MERGE/DELETE. We need the other changes in Analyzer and 
Optimizer rules.  For example, CheckAnalysis. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on pull request #30554: [SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE command

2020-11-30 Thread GitBox


HeartSaVioR commented on pull request #30554:
URL: https://github.com/apache/spark/pull/30554#issuecomment-736177445


   I'd agree to change it even in minor release if we have discussed this for 
enough time in public, and put some efforts on figuring out impacts to the end 
users and guide in prior (like roadmap).
   
   I don't think we did anything I mentioned. The last discussion we did before 
Spark 3.0.0 was more likely concerning the change without proper discussion, 
and we reverted it. Unifying create table syntax fixes the long term issue 
along confused two create table syntaxes, but that's it and it's not a 
rationalization of changing the default provider.
   
   Changing the default provider for create table is totally different story. 
My experience of Spark community says that we're most likely reluctant to make 
a backward incompatible change (even we did for major release), and sometimes 
we set old behavior by default even the new functionality is available. I'm 
surprised PR description doesn't mention anything about impacts - do we really 
think this doesn't break anything?
   
   I agree this requires enough discussion in public before going further. In 
discussion we should make clear the benefits of changing this, "AND" the all 
possible impacts of changing this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #30543: [SPARK-33597][SQL] Support REGEXP_LIKE for consistent with mainstream databases

2020-11-30 Thread GitBox


beliefer commented on pull request #30543:
URL: https://github.com/apache/spark/pull/30543#issuecomment-736176772


   > You forget to update the golden file. Anyway, the alias looks fine to me.
   
   Thanks for your remind.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #30480: [SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-based shuffle

2020-11-30 Thread GitBox


Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r533032962



##
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##
@@ -524,10 +669,37 @@ private[spark] class MapOutputTrackerMaster(
 }
   }
 
+  def registerMergeResult(shuffleId: Int, reduceId: Int, status: MergeStatus) {
+shuffleStatuses(shuffleId).addMergeResult(reduceId, status)
+  }
+
+  def registerMergeResults(shuffleId: Int, statuses: Seq[(Int, MergeStatus)]): 
Unit = {
+statuses.foreach {
+  case (reduceId, status) => registerMergeResult(shuffleId, reduceId, 
status)
+}
+  }
+
+  def unregisterMergeResult(shuffleId: Int, reduceId: Int, bmAddress: 
BlockManagerId) {

Review comment:
   Ok. We can leave it to the following PR since you already have a planned 
ticket for it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #30312: [SPARK-32917][SHUFFLE][CORE] Adds support for executors to push shuffle blocks after successful map task completion

2020-11-30 Thread GitBox


Ngone51 commented on a change in pull request #30312:
URL: https://github.com/apache/spark/pull/30312#discussion_r533031771



##
File path: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
##
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.File
+import java.net.ConnectException
+import java.nio.ByteBuffer
+import java.util.concurrent.ExecutorService
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
+
+import com.google.common.base.Throwables
+
+import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv}
+import org.apache.spark.annotation.Since
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.shuffle.ShuffleBlockPusher._
+import org.apache.spark.storage.{BlockId, BlockManagerId, ShufflePushBlockId}
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Used for pushing shuffle blocks to remote shuffle services when push 
shuffle is enabled.
+ * When push shuffle is enabled, it is created after the shuffle writer 
finishes writing the shuffle
+ * file and initiates the block push process.
+ *
+ * @param dataFile mapper generated shuffle data file
+ * @param partitionLengths array of shuffle block size so we can tell shuffle 
block
+ * boundaries within the shuffle file
+ * @param dep  shuffle dependency to get shuffle ID and the 
location of remote shuffle
+ * services to push local shuffle blocks
+ * @param partitionId  map index of the shuffle map task
+ * @param conf spark configuration
+ */
+@Since("3.1.0")
+private[spark] class ShuffleBlockPusher(
+dataFile: File,
+partitionLengths: Array[Long],
+dep: ShuffleDependency[_, _, _],
+partitionId: Int,
+conf: SparkConf) extends Logging {

Review comment:
   > For the constructor, I feel it's better to just pass in the spark conf 
object and let the ShuffleBlockPusher read the values of the configuration from 
it which it needs.
   
   Sounds fine. I actually do not have a strong opinion on this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30421: [SPARK-33474][SQL] Support TypeConstructed partition spec value

2020-11-30 Thread GitBox


AngersZh commented on pull request #30421:
URL: https://github.com/apache/spark/pull/30421#issuecomment-736169410


   Conflict resolved, any more update?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30556: [WIP][SPARK-33212][BUILD] Provide hadoop-aws-shaded jar in hadoop-cloud module

2020-11-30 Thread GitBox


AngersZh commented on pull request #30556:
URL: https://github.com/apache/spark/pull/30556#issuecomment-736161647


   For users of hadoop S3A using a custom version with custom code  (such as  
me), they can change hadoop build info and deploy to their own maven repo and 
change spark dependencies to their own maven repo.
   In this way can solve problem of user who will use their own jar, right?  
But it seems like a lot of work.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #30539: [SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading

2020-11-30 Thread GitBox


zhengruifeng commented on pull request #30539:
URL: https://github.com/apache/spark/pull/30539#issuecomment-736160622


   Merged to master. It seem that this fix can not be directly backported to 
branch-3.0. @WeichenXu123 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #30539: [SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading

2020-11-30 Thread GitBox


zhengruifeng closed pull request #30539:
URL: https://github.com/apache/spark/pull/30539


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #30557: [SPARK-33613][PYTHON][TESTS] Replace deprecated APIs in pyspark tests

2020-11-30 Thread GitBox


HyukjinKwon closed pull request #30557:
URL: https://github.com/apache/spark/pull/30557


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30557: [SPARK-33613][PYTHON][TESTS] Replace deprecated APIs in pyspark tests

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30557:
URL: https://github.com/apache/spark/pull/30557#issuecomment-736158773


   Merged to master.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #30243: [SPARK-33335][SQL] Support `array_contains_array` func

2020-11-30 Thread GitBox


AngersZh commented on pull request #30243:
URL: https://github.com/apache/spark/pull/30243#issuecomment-736156071


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30555: [SPARK-33608][SQL] Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30555:
URL: https://github.com/apache/spark/pull/30555#issuecomment-736155845


   cc @maryannxue as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sarutak commented on pull request #30546: [SPARK-33607][SS][WEBUI] Input Rate timeline/histogram aren't rendered if built with Scala 2.13

2020-11-30 Thread GitBox


sarutak commented on pull request #30546:
URL: https://github.com/apache/spark/pull/30546#issuecomment-736154866


   @srowen 
   > I only wonder if this value is used elsewhere where a big positive value 
is unexpected.
   
   `inputTimeSec` is a local val and it's used only for `numRecords / 
inputTimeSec` so I believe this change doesn't affect elsewhere.
   
   @HeartSaVioR 
   > Is the calculation numRecords / inputTimeSec (= Double.PositiveInfinity) 
consistent between 2.12 and 2.13? I guess you've experimented for both 
versions, but just to confirm again.
   
   Yes, I have confirmed the result of `numRecords / Double.PositiveInfinity` 
is zero for both 2.12 and 2.13.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30554: [SPARK-30098][SQL] Use default datasource as provider for CREATE TABLE command

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30554:
URL: https://github.com/apache/spark/pull/30554#issuecomment-736153495


   I don't think this is something banned as long as we have a way to restore 
the previous behaviour. More importantly, in Apache Spark, `CREATE TABLE` 
should create a Spark table, not Hive table.
   
   _All_ items in the migration guide are behaviour changes since we don't 
usually list bug fixes in the migration guide. I can provide a bunch of similar 
examples if you guys doubt.
   
   So I believe the point we should discuss/focus here is that this is a 
significant behaviour change since `CREATE TABLE` is the very entry point of 
users. Personally the vote sounds fine to me given that two committers think it 
might better need.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #30498: [SPARK-33556][ML] Add array_to_vector function for dataframe column

2020-11-30 Thread GitBox


HyukjinKwon closed pull request #30498:
URL: https://github.com/apache/spark/pull/30498


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #30498: [SPARK-33556][ML] Add array_to_vector function for dataframe column

2020-11-30 Thread GitBox


HyukjinKwon commented on pull request #30498:
URL: https://github.com/apache/spark/pull/30498#issuecomment-736145317


   Merged to master.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #29097: [SPARK-32299] [SQL] Decide SMJ Join Orientation adaptively

2020-11-30 Thread GitBox


github-actions[bot] commented on pull request #29097:
URL: https://github.com/apache/spark/pull/29097#issuecomment-736143670


   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #29511: [WIP][POC][SPARK-32642][K8S] Add an external shuffle service as a sidecar option for Kubernetes

2020-11-30 Thread GitBox


github-actions[bot] closed pull request #29511:
URL: https://github.com/apache/spark/pull/29511


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WeichenXu123 commented on pull request #30539: [SPARK-33592] Fix: Pyspark ML Validator params in estimatorParamMaps may be lost after saving and reloading

2020-11-30 Thread GitBox


WeichenXu123 commented on pull request #30539:
URL: https://github.com/apache/spark/pull/30539#issuecomment-736142470


   > Looks OK - test cleanup looks like a win too. Can the refactored logic be 
reused anywhere else?
   
   Yes. The MetaAlgorithmReadWrite code will be reused in the PR 
https://github.com/apache/spark/pull/30471



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

2020-11-30 Thread GitBox


BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r532996042



##
File path: python/pyspark/sql/tests/test_arrow.py
##
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
 pdf_arrow = df.toPandas()
 assert_frame_equal(pdf_arrow, pdf)
 
+def test_pandas_self_destruct(self):
+import pyarrow as pa
+rows = 2 ** 16
+cols = 8
+df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+expected_bytes = rows * cols * 8
+with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+# We hold on to the table reference here, so if self destruct 
didn't work, then
+# there would be 2 copies of the data (one in Arrow, one in 
Pandas), both
+# tracked by the Arrow allocator

Review comment:
   Oh, I missed this comment that you need the Table reference. Otherwise 
it could be freed and then we lose track of memory allocated right? If that's 
the case, what about just calling 
`_collect_as_arrow(_force_split_batches=True)` here and perform batches -> 
table -> pandas directly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] BryanCutler commented on a change in pull request #29818: [SPARK-32953][PYTHON] Add Arrow self_destruct support to toPandas

2020-11-30 Thread GitBox


BryanCutler commented on a change in pull request #29818:
URL: https://github.com/apache/spark/pull/29818#discussion_r532991912



##
File path: python/pyspark/sql/tests/test_arrow.py
##
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
 pdf_arrow = df.toPandas()
 assert_frame_equal(pdf_arrow, pdf)
 
+def test_pandas_self_destruct(self):
+import pyarrow as pa
+rows = 2 ** 16
+cols = 8
+df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+expected_bytes = rows * cols * 8
+with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+# We hold on to the table reference here, so if self destruct 
didn't work, then
+# there would be 2 copies of the data (one in Arrow, one in 
Pandas), both
+# tracked by the Arrow allocator
+pdf, table = df._collect_as_arrow_table()
+self.assertEqual((rows, cols), pdf.shape)
+# If self destruct did work, then memory usage should be only a 
little above
+# the minimum memory necessary for the dataframe
+self.assertLessEqual(pa.total_allocated_bytes(), 1.2 * 
expected_bytes)

Review comment:
   just to be safe, what do you think about getting allocated bytes before 
and after, then comparing difference? It should probably be the same, but would 
be a little more focused then in case something changes in the future

##
File path: python/pyspark/sql/tests/test_arrow.py
##
@@ -191,6 +191,32 @@ def test_pandas_round_trip(self):
 pdf_arrow = df.toPandas()
 assert_frame_equal(pdf_arrow, pdf)
 
+def test_pandas_self_destruct(self):
+import pyarrow as pa
+rows = 2 ** 16
+cols = 8
+df = self.spark.range(0, rows).select(*[rand() for _ in range(cols)])
+expected_bytes = rows * cols * 8
+with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": True}):
+# We hold on to the table reference here, so if self destruct 
didn't work, then
+# there would be 2 copies of the data (one in Arrow, one in 
Pandas), both
+# tracked by the Arrow allocator
+pdf, table = df._collect_as_arrow_table()
+self.assertEqual((rows, cols), pdf.shape)
+# If self destruct did work, then memory usage should be only a 
little above
+# the minimum memory necessary for the dataframe
+self.assertLessEqual(pa.total_allocated_bytes(), 1.2 * 
expected_bytes)
+del pdf, table
+self.assertEqual(pa.total_allocated_bytes(), 0)
+with 
self.sql_conf({"spark.sql.execution.arrow.pyspark.selfDestruct.enabled": 
False}):
+# Force the internals to reallocate data via PyArrow's allocator 
so that it
+# gets measured by total_allocated_bytes
+pdf, table = df._collect_as_arrow_table(_force_split_batches=True)
+total_allocated_bytes = pa.total_allocated_bytes()

Review comment:
   looks to be unused?

##
File path: python/pyspark/sql/pandas/conversion.py
##
@@ -252,6 +242,74 @@ def _collect_as_arrow(self):
 # Re-order the batch list using the correct order
 return [batches[i] for i in batch_order]
 
+def _collect_as_arrow_table(self, _force_split_batches=False):

Review comment:
   I don't think we really need to add this method, it just does the 
conversion from batches to DataFrame. The unit test doesn't even use the 
returned Table right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   >