[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207846166 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- Add a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207845712 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- Add a comment above this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21898: [SPARK-24817][Core] Implement BarrierTaskContext.barrier...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21898 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21998: [SPARK-24940][SQL] Use IntegerLiteral in ResolveCoalesce...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21998 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207732505 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() cal
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207732072 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, --- End diff -- -> `timeoutInSecs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731918 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() cal
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731620 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send --- End diff -- `Provide current state of a barrier() call, the state` -> `Provide the current state of a barrier() call. The state` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731501 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding --- End diff -- `Remember` -> `Record` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731274 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect --- End diff -- `doesn't collect` -> `is unable to collect` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731107 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() cal
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731198 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. --- End diff -- ```Scala Since only one barrier() call is triggered by each barrier stage attempt, we use (stageId, stageAttemptId) to identify the stage attempt which the barrier() call is from. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731249 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect --- End diff -- `received all the requests for a group of `barrier()` calls` -> `all the requests for a group of `barrier()` calls are received` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731446 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. --- End diff -- `due to timeout` -> `and return a timeout exception` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730912 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() cal
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730852 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() cal
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730025 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -567,4 +567,14 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val BARRIER_SYNC_TIMEOUT = +ConfigBuilder("spark.barrier.sync.timeout") + .doc("The timeout in seconds for each barrier() call from a barrier task. If the " + +"coordinator didn't receive all the sync messages from barrier tasks within the " + +"configed time, throw a SparkException to fail all the tasks. The default value is set " + +"to 31536000(3600 * 24 * 365) so the barrier() call shall wait for one year.") + .intConf --- End diff -- `.timeConf(TimeUnit.SECONDS)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21994: [SPARK-24529][Build][test-maven][follow-up] Add s...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21994#discussion_r207729852 --- Diff: pom.xml --- @@ -2609,6 +2609,28 @@ + +com.github.spotbugs +spotbugs-maven-plugin --- End diff -- Actually, after having this plugin, I were unable to perform parallel builds. https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21994: [SPARK-24529][Build][test-maven][follow-up] Add s...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21994#discussion_r207729865 --- Diff: pom.xml --- @@ -2609,6 +2609,28 @@ + +com.github.spotbugs +spotbugs-maven-plugin --- End diff -- It sounds like this plug in is not thread safe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21948: [SPARK-24991][SQL] use InternalRow in DataSourceWriter
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21948 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17185: [SPARK-19602][SQL] Support column resolution of fully qu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17185 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21982 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21982 @ueshin You need to address the conflicts again. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregat...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21970#discussion_r207701793 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala --- @@ -111,23 +111,23 @@ abstract class CentralMomentAgg(child: Expression) val delta2 = delta * delta val deltaN2 = deltaN * deltaN val newM3 = if (momentOrder >= 3) { - m3 - Literal(3.0) * deltaN * newM2 + delta * (delta2 - deltaN2) + m3 - deltaN * 3.0 * newM2 + delta * (delta2 - deltaN2) --- End diff -- Please avoid these changes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r207701674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -505,6 +505,7 @@ object NullPropagation extends Rule[LogicalPlan] { // If the value expression is NULL then transform the In expression to null literal. case In(Literal(null, _), _) => Literal.create(null, BooleanType) + case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, BooleanType) --- End diff -- Add a test case in OptimizeInSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r207701622 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala --- @@ -154,7 +154,7 @@ class ExpressionParserSuite extends PlanTest { test("in sub-query") { assertEqual( "a in (select b from c)", - In('a, Seq(ListQuery(table("c").select('b) + InSubquery(Seq('a), ListQuery(table("c").select('b --- End diff -- Could you add more cases in this test case? For example, when the input is CreateNamedStruct --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r207701506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -505,6 +505,7 @@ object NullPropagation extends Rule[LogicalPlan] { // If the value expression is NULL then transform the In expression to null literal. case In(Literal(null, _), _) => Literal.create(null, BooleanType) + case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, BooleanType) --- End diff -- Thanks for adding this! Please double check all the cases of `IN` in all the optimizer rules. We are afraid this new expression might introduce a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21403 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21909: [SPARK-24959][SQL] Speed up count() for JSON and CSV
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21909 Please document it in the migration guide. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207701331 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1476,6 +1476,14 @@ object SQLConf { "are performed before any UNION, EXCEPT and MINUS operations.") .booleanConf .createWithDefault(false) + + val BYPASS_PARSER_FOR_EMPTY_SCHEMA = buildConf("spark.sql.bypassParserForEmptySchema") --- End diff -- Let us get rid of this in the next release. Mark it as an internal and use the legacy scheme. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r207701260 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + val contacts = +Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: +Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + val briefContacts = +BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( +id: Int, +name: FullName, +address: String, +pets: Int, +friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map(), +p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { +val query = sql("select name.middle, name from contacts order by id") +checkScanSchemata(query, "struct>") +checkAnswer(query, + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { +val query = sql("select friends.middle, friends from contacts where p=1 order by id") +checkScanSchemata(query, + "struct>>") +checkAnswer(query, + Row(Array("Z.&quo
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 You can address the comment in the follow-up PR. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21911#discussion_r207701114 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala --- @@ -102,6 +104,32 @@ object ResolveHints { } } + /** + * COALESCE Hint accepts name "COALESCE" and "REPARTITION". + * Its parameter includes a partition number. + */ + object ResolveCoalesceHints extends Rule[LogicalPlan] { +private val COALESCE_HINT_NAMES = Set("COALESCE", "REPARTITION") + +def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case h: UnresolvedHint if COALESCE_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => +val hintName = h.name.toUpperCase(Locale.ROOT) +val shuffle = hintName match { + case "REPARTITION" => true + case "COALESCE" => false +} +val numPartitions = h.parameters match { + case Seq(Literal(numPartitions: Int, IntegerType)) => --- End diff -- Use `IntegerLiteral` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21993: [SPARK-24983][Catalyst] Add configuration for maximum nu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21993 @dvogelbacher Currently, in the master branch (2.4 release), you have a workaround. Add CollapseProject to `spark.sql.optimizer.excludedRules` before such queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21993: [SPARK-24983][Catalyst] Add configuration for maximum nu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21993 Let us blacklist CASE WHEN in CollapseProject, instead of introducing this new conf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21965: [SPARK-23909][SQL] Add filter function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21965 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21941 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21964 How about KeyValueGroupedDataset? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21932: [SPARK-24979][SQL] add AnalysisHelper#resolveOperatorsUp
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21932 It sounds reasonable to me. cc @rxin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21754 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21965: [WIP][SPARK-23909][SQL] Add filter function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21965 @ueshin Please rebase it. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21977: SPARK-25004: Add spark.executor.pyspark.memory limit.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21977 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21941 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 > Which test suite is a good place to add such end-to-end case? org.apache.spark.sql.SQLQuerySuite might be the best place. > Do we plan to support such call as df.hint("COALESCE", Seq(10))? Why not just use df.coalesce(10)? It also does not make sense to block it. Since this is a new hint, we should check whether it works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20611 > Yes there is a change in the behavior, As i mentioned above in descriptions now we will be able to support wildcard even in the folder level for local file systems. Previous versions will throw exception in such scenarios. Anything else? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21754 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21973: [BUILD] Fix lint-python.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21973 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 cc @maryannxue Please review it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce and Repartition Hint for SQL...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 Generally, it looks good to me. Let us target this to the upcoming 2.4 release We need to improve the test coverage: - Add a test case when users specify multiple REPARTITION, COALESCE hints `... INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t ...` - Add a test case when users specify BROADCAST and REPARTITION hints at the same time - Add an end-to-end test case. - Add a unit test case in ResolveHintsSuite.scala to verify the hint names are case insensitive. - Add more negative test cases. For example, `"SELECT /*+ COALESCE(1.0) */ * FROM t"` and `"SELECT /*+ COALESCE(3 + 4) */ * FROM t"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21941 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21969: [SPARK-24945][SQL] Switching to uniVocity 2.7.3
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21969 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21935: [SPARK-24773] Avro: support logical timestamp type with ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21935 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21954 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21962: [SPARK-24865][FOLLOW-UP] Remove AnalysisBarrier LogicalP...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21962 cc @rxin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21951 https://issues.apache.org/jira/browse/SPARK-24996 is created, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21962: [SPARK-24865] Remove AnalysisBarrier LogicalPlan ...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/21962 [SPARK-24865] Remove AnalysisBarrier LogicalPlan Node ## What changes were proposed in this pull request? Remove the AnalysisBarrier LogicalPlan node, which is useless now. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark refactor2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21962.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21962 commit 7f70aaadbad680f41b4b3f42798c2f64e94e1e6a Author: Xiao Li Date: 2018-08-02T06:03:41Z clean --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21951 Thanks! Merged to master. Please ignore the last commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19449 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21951 This will simplify the code and improve the readability. We can do the same in the other expression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21958: [minor] remove dead code in ExpressionEvalHelper
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21958 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19449 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21752: [SPARK-24788][SQL] fixed UnresolvedException when toStri...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21752 @maropu Maybe you take it over? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21954: [SPARK-23908][SQL] Add transform function.
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21954 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21951 cc @hvanhovell @rednaxelafx @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21951: [SPARK-24957][SQL][FOLLOW-UP] Clean the code for ...
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/21951 [SPARK-24957][SQL][FOLLOW-UP] Clean the code for AVERAGE ## What changes were proposed in this pull request? This PR is to refactor the code in AVERAGE by dsl. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark refactor1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21951.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21951 commit 511b7b6bbf4e22092c024dd0deb58daa6bb23b4b Author: Xiao Li Date: 2018-08-02T00:24:05Z clean up --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21087 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21949: [SPARK-24957][SQL][BACKPORT-2.2] Average with decimal fo...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21949 Thanks! Merged to 2.2. Could you close this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21898: [SPARK-24817][Core] Implement BarrierTaskContext.barrier...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21898 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21915: [SPARK-24954][Core] Fail fast on job submit if run a bar...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21915 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21946: [SPARK-24990][SQL] merge ReadSupport and ReadSupportWith...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21946 @rdblue This change is pretty isolated. It also LGTM to me. Since you are fine about the change, I am assuming you are not blocking this. I will merge this soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21892 Great! Let us wait for 2.7.3 build? @jbax When will it be released? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207032024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- both? If we introduce a behavior change, we need to document it in the migration guide and add a conf. Users can do the conf to revert back to the previous behaviors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19449: [SPARK-22219][SQL] Refactor code to get a value for "spa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19449 LGTM pending Jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21883: [SPARK-24937][SQL] Datasource partition table should loa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21883 LGTM Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207020824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala --- @@ -78,7 +93,8 @@ object CommandUtils extends Logging { val size = if (fileStatus.isDirectory) { fs.listStatus(path) .map { status => -if (!status.getPath.getName.startsWith(stagingDir)) { +if (!status.getPath.getName.startsWith(stagingDir) && + DataSourceUtils.isDataPath(path)) { --- End diff -- This is also a behavior change. Could you document it too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r207019185 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") + .doc("When set to true and order of evaluation is not specified by parentheses, " + +"INTERSECT operations are performed before any UNION, EXCEPT amd MINUS operations. " + --- End diff -- and --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r207018987 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") + .doc("When set to true and order of evaluation is not specified by parentheses, " + +"INTERSECT operations are performed before any UNION, EXCEPT amd MINUS operations. " + +"When set to false and the order of evaluation is not specified by parentheses, the" + +"set operations are performed from left to right as they appear in the query.") + .booleanConf --- End diff -- let us mark it internal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r207018914 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") --- End diff -- spark.sql.legacy.setopsPrecedence.enabled --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207017248 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") + .internal() + .doc("If True, File listing for compute statistics is done in parallel.") --- End diff -- When true, SQL commands use parallel file listing, as opposed to single thread listing. This usually speeds up commands that need to list many directories. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21608: [SPARK-24626] [SQL] Improve location size calcula...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21608#discussion_r207017122 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1449,6 +1449,13 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val COMPUTE_STATS_LIST_FILES_IN_PARALLEL = +buildConf("spark.sql.execution.computeStatsListFilesInParallel") --- End diff -- How about `spark.sql.parallelFileListingInCommands.enabled`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21921 It sounds like Github is experiencing a very bad delay. @cloud-fan Could you submit a follow-up PR to address the comments from @rdblue ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21921 @rdblue I do not think it is documented. Let us be more conservative. Collect LGTM from the committers no matter whether the PR author is a committer or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r206985104 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala --- @@ -56,9 +57,14 @@ class FailureSafeParser[IN]( } } + private val skipParsing = optimizeEmptySchema && schema.isEmpty def parse(input: IN): Iterator[InternalRow] = { try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + if (skipParsing) { + Iterator.single(InternalRow.empty) + } else { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) --- End diff -- Could you add a test case for counting both CSV and JSON source when the files having broken records? Any behavior change after this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21921 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21921: [SPARK-24971][SQL] remove SupportsDeprecatedScanRow
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21921 @cloud-fan To be save, let us get one more LGTM from the other committer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763936 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") --- End diff -- let me think about the name of conf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763732 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1451,6 +1451,15 @@ object SQLConf { .intConf .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + + val SETOPS_PRECEDENCE_ENFORCED = +buildConf("spark.sql.setops.precedence.enforced") + .doc("When set to true and order of evaluation is not specified by parentheses, " + +"INTERSECT operations are performed before any UNION or EXCEPT operations. " + --- End diff -- also include MINUS --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763501 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -676,4 +677,42 @@ class PlanParserSuite extends AnalysisTest { OneRowRelation().select('rtrim.function("c&^,.", "bc...,,,&&&ccc")) ) } + + test("precedence of set operations") { +val a = table("a").select(star()) +val b = table("b").select(star()) +val c = table("c").select(star()) +val d = table("d").select(star()) + +val query1 = + """ +|SELECT * FROM a +|UNION +|SELECT * FROM b +|EXCEPT +|SELECT * FROM c +|INTERSECT +|SELECT * FROM d + """.stripMargin + +val query2 = + """ +|SELECT * FROM a +|UNION +|SELECT * FROM b +|EXCEPT ALL +|SELECT * FROM c +|INTERSECT ALL +|SELECT * FROM d + """.stripMargin + +assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d))) --- End diff -- also add `withSQLConf(SQLConf.SETOPS_PRECEDENCE_ENFORCED.key -> "true") {` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21941: [SPARK-24966][SQL] Implement precedence rules for...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21941#discussion_r206763358 --- Diff: sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 --- @@ -17,6 +17,12 @@ grammar SqlBase; @members { + /** + * When true, INTERSECT is given precedence over UNION and EXCEPT set operations as per --- End diff -- > When true, INTERSECT is given precedence over UNION and EXCEPT set operations as per -> > When true, INTERSECT is given the greater precedence over the other set operations (UNION, EXCEPT and MINUS) as per --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19449: [SPARK-22219][SQL] Refactor code to get a value f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19449#discussion_r206760031 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala --- @@ -82,4 +84,22 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { assert(checks.forall(_ == true)) } } + + test("SPARK-22219: refactor to control to generate comment") { +withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> "false") { + val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() +.queryExecution.executedPlan) + assert(res.length == 2) + assert(res.forall{ case (_, code) => +!code.contains("* Codegend pipeline") && !code.contains("// input[")}) +} + +withSQLConf(StaticSQLConf.CODEGEN_COMMENTS.key -> "true") { + val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count() +.queryExecution.executedPlan) + assert(res.length == 2) + assert(res.forall{ case (_, code) => +code.contains("* Codegend pipeline") && code.contains("// input[")}) +} --- End diff -- combine these two? ``` Seq(true, false).foreach { flag => ... if (flag) { ... } else { ... } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21752: [SPARK-24788][SQL] fixed UnresolvedException when toStri...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21752 ping @c-horn --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21938: [SPARK-24982][SQL] UDAF resolution should not throw Asse...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21938 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21911: [SPARK-24940][SQL] Coalesce Hint for SQL Queries
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21911 @jzhuge I knew all the DBMS have the hints. Do you know whether any system has a hint like `Coalesce`? Could you check the systems that have sophisticated hints like Oracle and DB2? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21939 @BryanCutler Thanks! What is the expected target release date of Apache Arrow 0.10.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21934: [SPARK-24951][SQL] Table valued functions should throw A...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21934 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21782: [SPARK-24816][SQL] SQL interface support repartitionByRa...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21782 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21403 Had a related discussion with @marmbrus a few months ago. He also does not like reusing `IN` expression for subquery processing. I think it makes sense to introduce `InSubquery` expressions for subqueries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21892 @jbax Thanks for the info! ping @MaxGekk @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20611: [SPARK-23425][SQL]Support wildcard in HDFS path f...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20611#discussion_r206582602 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -303,94 +303,44 @@ case class LoadDataCommand( s"partitioned, but a partition spec was provided.") } } - -val loadPath = +val loadPath = { if (isLocal) { -val uri = Utils.resolveURI(path) -val file = new File(uri.getPath) -val exists = if (file.getAbsolutePath.contains("*")) { - val fileSystem = FileSystems.getDefault - val dir = file.getParentFile.getAbsolutePath - if (dir.contains("*")) { -throw new AnalysisException( - s"LOAD DATA input path allows only filename wildcard: $path") - } - - // Note that special characters such as "*" on Windows are not allowed as a path. - // Calling `WindowsFileSystem.getPath` throws an exception if there are in the path. - val dirPath = fileSystem.getPath(dir) - val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath - val safePathPattern = if (Utils.isWindows) { -// On Windows, the pattern should not start with slashes for absolute file paths. -pathPattern.stripPrefix("/") - } else { -pathPattern - } - val files = new File(dir).listFiles() - if (files == null) { -false - } else { -val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern) -files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath))) - } -} else { - new File(file.getAbsolutePath).exists() -} -if (!exists) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -uri +val localFS = FileContext.getLocalFSFileContext() +localFS.makeQualified(new Path(path)) } else { -val uri = new URI(path) -val hdfsUri = if (uri.getScheme() != null && uri.getAuthority() != null) { - uri -} else { - // Follow Hive's behavior: - // If no schema or authority is provided with non-local inpath, - // we will use hadoop configuration "fs.defaultFS". - val defaultFSConf = sparkSession.sessionState.newHadoopConf().get("fs.defaultFS") - val defaultFS = if (defaultFSConf == null) { -new URI("") - } else { -new URI(defaultFSConf) - } - - val scheme = if (uri.getScheme() != null) { -uri.getScheme() - } else { -defaultFS.getScheme() - } - val authority = if (uri.getAuthority() != null) { -uri.getAuthority() - } else { -defaultFS.getAuthority() - } - - if (scheme == null) { -throw new AnalysisException( - s"LOAD DATA: URI scheme is required for non-local input paths: '$path'") - } - - // Follow Hive's behavior: - // If LOCAL is not specified, and the path is relative, - // then the path is interpreted relative to "/user/" - val uriPath = uri.getPath() - val absolutePath = if (uriPath != null && uriPath.startsWith("/")) { -uriPath - } else { -s"/user/${System.getProperty("user.name")}/$uriPath" - } - new URI(scheme, authority, absolutePath, uri.getQuery(), uri.getFragment()) -} -val hadoopConf = sparkSession.sessionState.newHadoopConf() -val srcPath = new Path(hdfsUri) -val fs = srcPath.getFileSystem(hadoopConf) -if (!fs.exists(srcPath)) { - throw new AnalysisException(s"LOAD DATA input path does not exist: $path") -} -hdfsUri +val loadPath = new Path(path) +// Follow Hive's behavior: +// If no schema or authority is provided with non-local inpath, --- End diff -- We checked the scheme and authority? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org