[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207846166
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends 
Logging {
 Utils.tryLogNonFatalError {
   _executorAllocationManager.foreach(_.stop())
 }
+if (_dagScheduler != null) {
--- End diff --

Add a comment 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-06 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207845712
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * For each barrier stage attempt, only at most one barrier() call can be 
active at any time, thus
+ * we can use (stageId, stageAttemptId) to identify the stage attempt 
where the barrier() call is
+ * from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * all the requests for a group of `barrier()` calls are received. If the 
coordinator is unable to
+ * collect enough global sync requests within a configured time, fail all 
the requests and return
+ * an Exception with timeout message.
+ */
+private[spark] class BarrierCoordinator(
+timeoutInSecs: Long,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private lazy val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
--- End diff --

Add a comment above this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21898: [SPARK-24817][Core] Implement BarrierTaskContext.barrier...

2018-08-06 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21898
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21998: [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesce...

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21998
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207732505
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
+   * out a barrier() call, and recycled on stage completed.
+   *
+   * @param barrierId Identifier of the barrier stage that make a 
barrier() call.
+   * @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
+   * collect `numTasks` requests to succeed.
+   */
+  private class ContextBarrierState(
+  val barrierId: ContextBarrierId,
+  val numTasks: Int) {
+
+// There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
+// to identify each barrier() call. It shall get increased when a 
barrier() call succeed, or
+// reset when a barrier() call fail due to timeout.
+private var barrierEpoch: Int = 0
+
+// An array of RPCCallContexts for barrier tasks that are waiting for 
reply of a barrier()
+// call.
+private val requesters: ArrayBuffer[RpcCallContext] = new 
ArrayBuffer[RpcCallContext](numTasks)
+
+// A timer task that ensures we may timeout for a barrier() call.
+private var timerTask: TimerTask = null
+
+// Init a TimerTask for a barrier() cal

[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207732072
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
--- End diff --

-> `timeoutInSecs`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731918
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
+   * out a barrier() call, and recycled on stage completed.
+   *
+   * @param barrierId Identifier of the barrier stage that make a 
barrier() call.
+   * @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
+   * collect `numTasks` requests to succeed.
+   */
+  private class ContextBarrierState(
+  val barrierId: ContextBarrierId,
+  val numTasks: Int) {
+
+// There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
+// to identify each barrier() call. It shall get increased when a 
barrier() call succeed, or
+// reset when a barrier() call fail due to timeout.
+private var barrierEpoch: Int = 0
+
+// An array of RPCCallContexts for barrier tasks that are waiting for 
reply of a barrier()
+// call.
+private val requesters: ArrayBuffer[RpcCallContext] = new 
ArrayBuffer[RpcCallContext](numTasks)
+
+// A timer task that ensures we may timeout for a barrier() call.
+private var timerTask: TimerTask = null
+
+// Init a TimerTask for a barrier() cal

[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731620
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
--- End diff --

`Provide current state of a barrier() call, the state`
-> `Provide the current state of a barrier() call. The state`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731501
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
--- End diff --

`Remember` -> `Record`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731274
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
--- End diff --

`doesn't collect` -> `is unable to collect`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731107
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
+   * out a barrier() call, and recycled on stage completed.
+   *
+   * @param barrierId Identifier of the barrier stage that make a 
barrier() call.
+   * @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
+   * collect `numTasks` requests to succeed.
+   */
+  private class ContextBarrierState(
+  val barrierId: ContextBarrierId,
+  val numTasks: Int) {
+
+// There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
+// to identify each barrier() call. It shall get increased when a 
barrier() call succeed, or
+// reset when a barrier() call fail due to timeout.
+private var barrierEpoch: Int = 0
+
+// An array of RPCCallContexts for barrier tasks that are waiting for 
reply of a barrier()
+// call.
+private val requesters: ArrayBuffer[RpcCallContext] = new 
ArrayBuffer[RpcCallContext](numTasks)
+
+// A timer task that ensures we may timeout for a barrier() call.
+private var timerTask: TimerTask = null
+
+// Init a TimerTask for a barrier() cal

[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731198
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
--- End diff --

```Scala
Since only one barrier() call is triggered by each barrier stage attempt, 
we use
(stageId, stageAttemptId) to identify the stage attempt which the barrier() 
call is from.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731249
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
--- End diff --

`received all the requests for a group of `barrier()` calls`
->
`all the requests for a group of `barrier()` calls are received`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207731446
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
--- End diff --

`due to timeout` -> `and return a timeout exception`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207730912
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
+   * out a barrier() call, and recycled on stage completed.
+   *
+   * @param barrierId Identifier of the barrier stage that make a 
barrier() call.
+   * @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
+   * collect `numTasks` requests to succeed.
+   */
+  private class ContextBarrierState(
+  val barrierId: ContextBarrierId,
+  val numTasks: Int) {
+
+// There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
+// to identify each barrier() call. It shall get increased when a 
barrier() call succeed, or
+// reset when a barrier() call fail due to timeout.
+private var barrierEpoch: Int = 0
+
+// An array of RPCCallContexts for barrier tasks that are waiting for 
reply of a barrier()
+// call.
+private val requesters: ArrayBuffer[RpcCallContext] = new 
ArrayBuffer[RpcCallContext](numTasks)
+
+// A timer task that ensures we may timeout for a barrier() call.
+private var timerTask: TimerTask = null
+
+// Init a TimerTask for a barrier() cal

[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207730852
  
--- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala ---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.function.Consumer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerStageCompleted}
+
+/**
+ * Only one barrier() call shall happen on a barrier stage attempt at each 
time, we can use
+ * (stageId, stageAttemptId) to identify the stage attempt where the 
barrier() call is from.
+ */
+private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) {
+  override def toString: String = s"Stage $stageId (Attempt 
$stageAttemptId)"
+}
+
+/**
+ * A coordinator that handles all global sync requests from 
BarrierTaskContext. Each global sync
+ * request is generated by `BarrierTaskContext.barrier()`, and identified 
by
+ * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global 
sync requests upon
+ * received all the requests for a group of `barrier()` calls. If the 
coordinator doesn't collect
+ * enough global sync requests within a configured time, fail all the 
requests due to timeout.
+ */
+private[spark] class BarrierCoordinator(
+timeout: Int,
+listenerBus: LiveListenerBus,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with 
Logging {
+
+  private val timer = new Timer("BarrierCoordinator barrier epoch 
increment timer")
+
+  // Listen to StageCompleted event, clear corresponding 
ContextBarrierState.
+  private val listener = new SparkListener {
+override def onStageCompleted(stageCompleted: 
SparkListenerStageCompleted): Unit = {
+  val stageInfo = stageCompleted.stageInfo
+  val barrierId = ContextBarrierId(stageInfo.stageId, 
stageInfo.attemptNumber)
+  // Clear ContextBarrierState from a finished stage attempt.
+  cleanupBarrierStage(barrierId)
+}
+  }
+
+  // Remember all active stage attempts that make barrier() call(s), and 
the corresponding
+  // internal state.
+  private val states = new ConcurrentHashMap[ContextBarrierId, 
ContextBarrierState]
+
+  override def onStart(): Unit = {
+super.onStart()
+listenerBus.addToStatusQueue(listener)
+  }
+
+  /**
+   * Provide current state of a barrier() call, the state is created when 
a new stage attempt send
+   * out a barrier() call, and recycled on stage completed.
+   *
+   * @param barrierId Identifier of the barrier stage that make a 
barrier() call.
+   * @param numTasks Number of tasks of the barrier stage, all barrier() 
calls from the stage shall
+   * collect `numTasks` requests to succeed.
+   */
+  private class ContextBarrierState(
+  val barrierId: ContextBarrierId,
+  val numTasks: Int) {
+
+// There may be multiple barrier() calls from a barrier stage attempt, 
`barrierEpoch` is used
+// to identify each barrier() call. It shall get increased when a 
barrier() call succeed, or
+// reset when a barrier() call fail due to timeout.
+private var barrierEpoch: Int = 0
+
+// An array of RPCCallContexts for barrier tasks that are waiting for 
reply of a barrier()
+// call.
+private val requesters: ArrayBuffer[RpcCallContext] = new 
ArrayBuffer[RpcCallContext](numTasks)
+
+// A timer task that ensures we may timeout for a barrier() call.
+private var timerTask: TimerTask = null
+
+// Init a TimerTask for a barrier() cal

[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21898#discussion_r207730025
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -567,4 +567,14 @@ package object config {
   .intConf
   .checkValue(v => v > 0, "The value should be a positive integer.")
   .createWithDefault(2000)
+
+  private[spark] val BARRIER_SYNC_TIMEOUT =
+ConfigBuilder("spark.barrier.sync.timeout")
+  .doc("The timeout in seconds for each barrier() call from a barrier 
task. If the " +
+"coordinator didn't receive all the sync messages from barrier 
tasks within the " +
+"configed time, throw a SparkException to fail all the tasks. The 
default value is set " +
+"to 31536000(3600 * 24 * 365) so the barrier() call shall wait for 
one year.")
+  .intConf
--- End diff --

`.timeConf(TimeUnit.SECONDS)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21994: [SPARK-24529][Build][test-maven][follow-up] Add s...

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21994#discussion_r207729852
  
--- Diff: pom.xml ---
@@ -2609,6 +2609,28 @@
   
 
   
+  
+com.github.spotbugs
+spotbugs-maven-plugin
--- End diff --

Actually, after having this plugin, I were unable to perform parallel 
builds. 

https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21994: [SPARK-24529][Build][test-maven][follow-up] Add s...

2018-08-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21994#discussion_r207729865
  
--- Diff: pom.xml ---
@@ -2609,6 +2609,28 @@
   
 
   
+  
+com.github.spotbugs
+spotbugs-maven-plugin
--- End diff --

It sounds like this plug in is not thread safe. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21948
  
retest this please



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17185
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21982
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21982
  
@ueshin  You need to address the conflicts again. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregat...

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21970#discussion_r207701793
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
 ---
@@ -111,23 +111,23 @@ abstract class CentralMomentAgg(child: Expression)
 val delta2 = delta * delta
 val deltaN2 = deltaN * deltaN
 val newM3 = if (momentOrder >= 3) {
-  m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2)
+  m3 - deltaN * 3.0 * newM2 + delta * (delta2 - deltaN2)
--- End diff --

Please avoid these changes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21403#discussion_r207701674
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -505,6 +505,7 @@ object NullPropagation extends Rule[LogicalPlan] {
 
   // If the value expression is NULL then transform the In expression 
to null literal.
   case In(Literal(null, _), _) => Literal.create(null, BooleanType)
+  case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, 
BooleanType)
--- End diff --

Add a test case in OptimizeInSuite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21403#discussion_r207701622
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
 ---
@@ -154,7 +154,7 @@ class ExpressionParserSuite extends PlanTest {
   test("in sub-query") {
 assertEqual(
   "a in (select b from c)",
-  In('a, Seq(ListQuery(table("c").select('b)
+  InSubquery(Seq('a), ListQuery(table("c").select('b
--- End diff --

Could you add more cases in this test case? For example, when the input is 
CreateNamedStruct


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21403#discussion_r207701506
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 ---
@@ -505,6 +505,7 @@ object NullPropagation extends Rule[LogicalPlan] {
 
   // If the value expression is NULL then transform the In expression 
to null literal.
   case In(Literal(null, _), _) => Literal.create(null, BooleanType)
+  case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, 
BooleanType)
--- End diff --

Thanks for adding this! Please double check all the cases of `IN` in all 
the optimizer rules. We are afraid this new expression might introduce a 
regression. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21403
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21909: [SPARK-24959][SQL] Speed up count() for JSON and CSV

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21909
  
Please document it in the migration guide. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207701331
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1476,6 +1476,14 @@ object SQLConf {
 "are performed before any UNION, EXCEPT and MINUS operations.")
   .booleanConf
   .createWithDefault(false)
+
+  val BYPASS_PARSER_FOR_EMPTY_SCHEMA = 
buildConf("spark.sql.bypassParserForEmptySchema")
--- End diff --

Let us get rid of this in the next release. Mark it as an internal and use 
the legacy scheme. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r207701260
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z.&quo

[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
You can address the comment in the follow-up PR. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
LGTM

Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21911#discussion_r207701114
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
 ---
@@ -102,6 +104,32 @@ object ResolveHints {
 }
   }
 
+  /**
+   * COALESCE Hint accepts name "COALESCE" and "REPARTITION".
+   * Its parameter includes a partition number.
+   */
+  object ResolveCoalesceHints extends Rule[LogicalPlan] {
+private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION")
+
+def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+  case h: UnresolvedHint if 
COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
+val hintName = h.name.toUpperCase(Locale.ROOT)
+val shuffle = hintName match {
+  case "REPARTITION" => true
+  case "COALESCE" => false
+}
+val numPartitions = h.parameters match {
+  case Seq(Literal(numPartitions: Int, IntegerType)) =>
--- End diff --

Use `IntegerLiteral`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21993: [SPARK-24983][Catalyst] Add configuration for maximum nu...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21993
  
@dvogelbacher Currently, in the master branch (2.4 release), you have a 
workaround. Add CollapseProject to `spark.sql.optimizer.excludedRules` before 
such queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21993: [SPARK-24983][Catalyst] Add configuration for maximum nu...

2018-08-03 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21993
  
Let us blacklist CASE WHEN in CollapseProject, instead of introducing this 
new conf.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21965: [SPARK-23909][SQL] Add filter function.

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21965
  
cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21941
  
LGTM 

Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21964
  
How about KeyValueGroupedDataset?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21932: [SPARK-24979][SQL] add AnalysisHelper#resolveOperatorsUp

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21932
  
It sounds reasonable to me. cc @rxin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21754
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21965: [WIP][SPARK-23909][SQL] Add filter function.

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21965
  
@ueshin Please rebase it. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21977
  
cc @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21941
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
> Which test suite is a good place to add such end-to-end case?
org.apache.spark.sql.SQLQuerySuite might be the best place.

> Do we plan to support such call as df.hint("COALESCE", Seq(10))? Why not 
just use df.coalesce(10)?

It also does not make sense to block it. Since this is a new hint, we 
should check whether it works. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20611
  
> Yes there is a change in the behavior, As i mentioned above in 
descriptions now we will be able to support wildcard even in the folder level 
for local file systems. Previous versions will throw exception in such 
scenarios.

Anything else?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21754
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21973: [BUILD] Fix lint-python.

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21973
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
cc @maryannxue Please review it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
Generally, it looks good to me. Let us target this to the upcoming 2.4 
release

We need to improve the test coverage:
- Add a test case when users specify multiple REPARTITION, COALESCE hints  
`... INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ 
* FROM t ...`
- Add a test case when users specify BROADCAST and REPARTITION hints at the 
same time 
- Add an end-to-end test case. 
- Add a unit test case in ResolveHintsSuite.scala to verify the hint names 
are case insensitive.
- Add more negative test cases. For example, `"SELECT /*+ COALESCE(1.0) */ 
* FROM t"` and `"SELECT /*+ COALESCE(3 + 4) */ * FROM t"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21941
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21969: [SPARK-24945][SQL] Switching to uniVocity 2.7.3

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21969
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21935: [SPARK-24773] Avro: support logical timestamp type with ...

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21935
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21954
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21962: [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalP...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21962
  
cc @rxin @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21951
  
https://issues.apache.org/jira/browse/SPARK-24996 is created,


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21962: [SPARK-24865] Remove AnalysisBarrier LogicalPlan ...

2018-08-01 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/21962

[SPARK-24865] Remove AnalysisBarrier LogicalPlan Node

## What changes were proposed in this pull request?
Remove the AnalysisBarrier LogicalPlan node, which is useless now.

## How was this patch tested?
N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark refactor2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21962.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21962


commit 7f70aaadbad680f41b4b3f42798c2f64e94e1e6a
Author: Xiao Li 
Date:   2018-08-02T06:03:41Z

clean




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21951
  
Thanks! Merged to master. 

Please ignore the last commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19449
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21951
  
This will simplify the code and improve the readability. We can do the same 
in the other expression. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21958: [minor] remove dead code in ExpressionEvalHelper

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21958
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19449
  
retest this please



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21752: [SPARK-24788][SQL] fixed UnresolvedException when toStri...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21752
  
@maropu Maybe you take it over?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21954
  
cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21951
  
cc @hvanhovell @rednaxelafx @cloud-fan  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for ...

2018-08-01 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/21951

[SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE

## What changes were proposed in this pull request?
This PR is to refactor the code in AVERAGE by dsl.

## How was this patch tested?
N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark refactor1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21951.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21951


commit 511b7b6bbf4e22092c024dd0deb58daa6bb23b4b
Author: Xiao Li 
Date:   2018-08-02T00:24:05Z

clean up




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21087
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21949: [SPARK-24957][SQL][BACKPORT-2.2] Average with decimal fo...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21949
  
Thanks! Merged to 2.2. 

Could you close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21898: [SPARK-24817][Core] Implement BarrierTaskContext.barrier...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21898
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21915: [SPARK-24954][Core] Fail fast on job submit if run a bar...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21915
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21946
  
@rdblue This change is pretty isolated. It also LGTM to me. 

Since you are fine about the change, I am assuming you are not blocking 
this. I will merge this soon. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21892
  
Great! Let us wait for 2.7.3 build? @jbax When will it be released?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r207032024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

both? 

If we introduce a behavior change, we need to document it in the migration 
guide and add a conf. Users can do the conf to revert back to the previous 
behaviors. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19449
  
LGTM pending Jenkins.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21883: [SPARK-24937][SQL] Datasource partition table should loa...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21883
  
LGTM

Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207020824
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -78,7 +93,8 @@ object CommandUtils extends Logging {
   val size = if (fileStatus.isDirectory) {
 fs.listStatus(path)
   .map { status =>
-if (!status.getPath.getName.startsWith(stagingDir)) {
+if (!status.getPath.getName.startsWith(stagingDir) &&
+  DataSourceUtils.isDataPath(path)) {
--- End diff --

This is also a behavior change. Could you document it too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r207019185
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1451,6 +1451,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val SETOPS_PRECEDENCE_ENFORCED =
+buildConf("spark.sql.setops.precedence.enforced")
+  .doc("When set to true and order of evaluation is not specified by 
parentheses, " +
+"INTERSECT operations are performed before any UNION, EXCEPT amd 
MINUS operations. " +
--- End diff --

and


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r207018987
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1451,6 +1451,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val SETOPS_PRECEDENCE_ENFORCED =
+buildConf("spark.sql.setops.precedence.enforced")
+  .doc("When set to true and order of evaluation is not specified by 
parentheses, " +
+"INTERSECT operations are performed before any UNION, EXCEPT amd 
MINUS operations. " +
+"When set to false and the order of evaluation is not specified by 
parentheses, the" +
+"set operations are performed from left to right as they appear in 
the query.")
+  .booleanConf
--- End diff --

let us mark it internal. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r207018914
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1451,6 +1451,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val SETOPS_PRECEDENCE_ENFORCED =
+buildConf("spark.sql.setops.precedence.enforced")
--- End diff --

spark.sql.legacy.setopsPrecedence.enabled


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207017248
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
+  .internal()
+  .doc("If True, File listing for compute statistics is done in 
parallel.")
--- End diff --

When true, SQL commands use parallel file listing, as opposed to single 
thread listing. This usually speeds up commands that need to list many 
directories.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21608#discussion_r207017122
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1449,6 +1449,13 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
+buildConf("spark.sql.execution.computeStatsListFilesInParallel")
--- End diff --

How about `spark.sql.parallelFileListingInCommands.enabled`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21921
  
It sounds like Github is experiencing a very bad delay. @cloud-fan Could 
you submit a follow-up PR to address the comments from @rdblue ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21921
  
@rdblue I do not think it is documented. Let us be more conservative. 
Collect LGTM from the committers no matter whether the PR author is a committer 
or not. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21909#discussion_r206985104
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 ---
@@ -56,9 +57,14 @@ class FailureSafeParser[IN](
 }
   }
 
+  private val skipParsing = optimizeEmptySchema && schema.isEmpty
   def parse(input: IN): Iterator[InternalRow] = {
 try {
-  rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
+ if (skipParsing) {
+   Iterator.single(InternalRow.empty)
+ } else {
+   rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), 
() => null))
--- End diff --

Could you add a test case for counting both CSV and JSON source when the 
files having broken records? Any behavior change after this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21921
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow

2018-08-01 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21921
  
@cloud-fan To be save, let us get one more LGTM from the other committer. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r206763936
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1451,6 +1451,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val SETOPS_PRECEDENCE_ENFORCED =
+buildConf("spark.sql.setops.precedence.enforced")
--- End diff --

let me think about the name of conf


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r206763732
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1451,6 +1451,15 @@ object SQLConf {
 .intConf
 .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
 .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+
+  val SETOPS_PRECEDENCE_ENFORCED =
+buildConf("spark.sql.setops.precedence.enforced")
+  .doc("When set to true and order of evaluation is not specified by 
parentheses, " +
+"INTERSECT operations are performed before any UNION or EXCEPT 
operations. " +
--- End diff --

also include MINUS


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r206763501
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -676,4 +677,42 @@ class PlanParserSuite extends AnalysisTest {
   OneRowRelation().select('rtrim.function("c&^,.", "bc...,,,&&&ccc"))
 )
   }
+
+  test("precedence of set operations") {
+val a = table("a").select(star())
+val b = table("b").select(star())
+val c = table("c").select(star())
+val d = table("d").select(star())
+
+val query1 =
+  """
+|SELECT * FROM a
+|UNION
+|SELECT * FROM b
+|EXCEPT
+|SELECT * FROM c
+|INTERSECT
+|SELECT * FROM d
+  """.stripMargin
+
+val query2 =
+  """
+|SELECT * FROM a
+|UNION
+|SELECT * FROM b
+|EXCEPT ALL
+|SELECT * FROM c
+|INTERSECT ALL
+|SELECT * FROM d
+  """.stripMargin
+
+assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
--- End diff --

also add `withSQLConf(SQLConf.SETOPS_PRECEDENCE_ENFORCED.key -> "true") {`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21941#discussion_r206763358
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -17,6 +17,12 @@
 grammar SqlBase;
 
 @members {
+  /**
+   * When true, INTERSECT is given precedence over UNION and EXCEPT set 
operations as per
--- End diff --

> When true, INTERSECT is given precedence over UNION and EXCEPT set 
operations as per

->

> When true, INTERSECT is given the greater precedence over the other set 
operations (UNION, EXCEPT and MINUS) as per


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19449: [SPARK-22219][SQL] Refactor code to get a value f...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19449#discussion_r206760031
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
 ---
@@ -82,4 +84,22 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite 
with SQLTestUtils {
   assert(checks.forall(_ == true))
 }
   }
+
+  test("SPARK-22219: refactor to control to generate comment") {
+withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> "false") {
+  val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 
2).count()
+.queryExecution.executedPlan)
+  assert(res.length == 2)
+  assert(res.forall{ case (_, code) =>
+!code.contains("* Codegend pipeline") && !code.contains("// 
input[")})
+}
+
+withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> "true") {
+  val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 
2).count()
+.queryExecution.executedPlan)
+  assert(res.length == 2)
+  assert(res.forall{ case (_, code) =>
+code.contains("* Codegend pipeline") && code.contains("// 
input[")})
+}
--- End diff --

combine these two?
```
Seq(true, false).foreach { flag =>
  ...
  if (flag) {
 ...
  } else {
...
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21752: [SPARK-24788][SQL] fixed UnresolvedException when toStri...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21752
  
ping @c-horn 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21938: [SPARK-24982][SQL] UDAF resolution should not throw Asse...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21938
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21911
  
@jzhuge I knew all the DBMS have the hints. Do you know whether any system 
has a hint like `Coalesce`? Could you check the systems that have sophisticated 
hints like Oracle and DB2?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21939
  
@BryanCutler Thanks! What is the expected target release date of Apache 
Arrow 0.10.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21934: [SPARK-24951][SQL] Table valued functions should throw A...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21934
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21782: [SPARK-24816][SQL] SQL interface support repartitionByRa...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21782
  
+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21403
  
Had a related discussion with @marmbrus a few months ago. He also does not 
like reusing `IN` expression for subquery processing. I think it makes sense to 
introduce `InSubquery` expressions for subqueries. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21892
  
@jbax Thanks for the info! 

ping @MaxGekk @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...

2018-07-31 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20611#discussion_r206582602
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -303,94 +303,44 @@ case class LoadDataCommand(
   s"partitioned, but a partition spec was provided.")
   }
 }
-
-val loadPath =
+val loadPath = {
   if (isLocal) {
-val uri = Utils.resolveURI(path)
-val file = new File(uri.getPath)
-val exists = if (file.getAbsolutePath.contains("*")) {
-  val fileSystem = FileSystems.getDefault
-  val dir = file.getParentFile.getAbsolutePath
-  if (dir.contains("*")) {
-throw new AnalysisException(
-  s"LOAD DATA input path allows only filename wildcard: $path")
-  }
-
-  // Note that special characters such as "*" on Windows are not 
allowed as a path.
-  // Calling `WindowsFileSystem.getPath` throws an exception if 
there are in the path.
-  val dirPath = fileSystem.getPath(dir)
-  val pathPattern = new File(dirPath.toAbsolutePath.toString, 
file.getName).toURI.getPath
-  val safePathPattern = if (Utils.isWindows) {
-// On Windows, the pattern should not start with slashes for 
absolute file paths.
-pathPattern.stripPrefix("/")
-  } else {
-pathPattern
-  }
-  val files = new File(dir).listFiles()
-  if (files == null) {
-false
-  } else {
-val matcher = fileSystem.getPathMatcher("glob:" + 
safePathPattern)
-files.exists(f => 
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
-  }
-} else {
-  new File(file.getAbsolutePath).exists()
-}
-if (!exists) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
-}
-uri
+val localFS = FileContext.getLocalFSFileContext()
+localFS.makeQualified(new Path(path))
   } else {
-val uri = new URI(path)
-val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != 
null) {
-  uri
-} else {
-  // Follow Hive's behavior:
-  // If no schema or authority is provided with non-local inpath,
-  // we will use hadoop configuration "fs.defaultFS".
-  val defaultFSConf = 
sparkSession.sessionState.newHadoopConf().get("fs.defaultFS")
-  val defaultFS = if (defaultFSConf == null) {
-new URI("")
-  } else {
-new URI(defaultFSConf)
-  }
-
-  val scheme = if (uri.getScheme() != null) {
-uri.getScheme()
-  } else {
-defaultFS.getScheme()
-  }
-  val authority = if (uri.getAuthority() != null) {
-uri.getAuthority()
-  } else {
-defaultFS.getAuthority()
-  }
-
-  if (scheme == null) {
-throw new AnalysisException(
-  s"LOAD DATA: URI scheme is required for non-local input 
paths: '$path'")
-  }
-
-  // Follow Hive's behavior:
-  // If LOCAL is not specified, and the path is relative,
-  // then the path is interpreted relative to "/user/"
-  val uriPath = uri.getPath()
-  val absolutePath = if (uriPath != null && 
uriPath.startsWith("/")) {
-uriPath
-  } else {
-s"/user/${System.getProperty("user.name")}/$uriPath"
-  }
-  new URI(scheme, authority, absolutePath, uri.getQuery(), 
uri.getFragment())
-}
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-val srcPath = new Path(hdfsUri)
-val fs = srcPath.getFileSystem(hadoopConf)
-if (!fs.exists(srcPath)) {
-  throw new AnalysisException(s"LOAD DATA input path does not 
exist: $path")
-}
-hdfsUri
+val loadPath = new Path(path)
+// Follow Hive's behavior:
+// If no schema or authority is provided with non-local inpath,
--- End diff --

We checked the scheme and authority?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    3   4   5   6   7   8   9   10   11   12   >