[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21482 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91518/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21482 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21482 **[Test build #91518 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91518/testReport)** for PR 21482 at commit [`6a4d46e`](https://github.com/apache/spark/commit/6a4d46e0a9ab403364e26a7b8f16c9ca94c31a2e). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21482 **[Test build #91517 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91517/testReport)** for PR 21482 at commit [`f240fdf`](https://github.com/apache/spark/commit/f240fdf3a410e2fdec1fa668bc0218ac61078423). * This patch **fails due to an unknown error code, -9**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21482 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91517/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21482 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user kokes commented on the issue: https://github.com/apache/spark/pull/13599 Hi, thanks for all the work on this! I see requirements.txt mentioned here and there and, browsing this and other JIRAs, it seems to be the proposed way to specify dependencies in PySpark. As you probably know, the community has rallied around [Pipfile](https://github.com/pypa/pipfile)s as a replacement for requirements.txt. This has a few upsides (including a lock file), the main one being that the reference implementation ([Pipenv](http://pipenv.org/)) allows for installing packages into a new virtualenv directly, without having to activate it or run other commands. So that combines dependency management, reproducibility, and environment isolation. (Also, if one doesn't want said packages to be installed in a venv, there's an argument to install them system-wide.) I'm not proposing this PR gets extended to support Pipfiles, I just wanted to ask if this has been considered and is on the roadmap, since it seems to be the successor to requirements.txt. (We stumbled upon this as we were thinking of moving to Kubernetes and didn't know how dependencies were handled there [they aren't, yet, see #21092]. We could install dependencies in our target Docker images using Pipfiles, but submitting a Pipfile with our individual jobs would be a much cleaner solution.) Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user zjffdu commented on the issue: https://github.com/apache/spark/pull/13599 Thanks for the interest on this PR and the info about `Pipfiles`. I think we could support that after this PR get merged so that we can provide users more options for virtualenv based on their enviroment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/13599 @holdenk and @zjffdu, I believe manual tests are a-okay if it's difficult to write a test. We can manually test and expose this as an experimental feature too. BTW, I believe we can still have some tests to check if, for example, at least the string is properly constructed - https://github.com/apache/spark/pull/13599/files#r175670974? I think that could be enough for now. Somehow I happened to look into this multiple times over few years and I think it's better to go ahead than just blocking here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/13599 @JoshRosen, I roughly heard that you took a look about this before. Do you have a concern to address maybe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193640783 --- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierTaskContext.scala --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.barrier + +import java.util.Properties + +import org.apache.spark.{SparkEnv, TaskContextImpl} +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.Logging +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.util.RpcUtils + +class BarrierTaskContext( --- End diff -- BarrierTaskContextImpl? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193644506 --- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.barrier + +import java.util.{Timer, TimerTask} + +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +numTasks: Int, +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { + + private var epoch = 0 + + private val timer = new Timer("BarrierCoordinator epoch increment timer") + + private val syncRequests = new scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks) + + private def replyIfGetAllSyncRequest(): Unit = { +if (syncRequests.length == numTasks) { + syncRequests.foreach(_.reply(())) + syncRequests.clear() + epoch += 1 +} + } + + override def receive: PartialFunction[Any, Unit] = { +case IncreaseEpoch(previousEpoch) => + if (previousEpoch == epoch) { +syncRequests.foreach(_.sendFailure(new RuntimeException( + s"The coordinator cannot get all barrier sync requests within $timeout ms."))) --- End diff -- Have we considered to increase incrementally the time out when we can't get all barrier sync requests at an epoch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193658009 --- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.barrier + +import java.util.{Timer, TimerTask} + +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +numTasks: Int, +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { + + private var epoch = 0 + + private val timer = new Timer("BarrierCoordinator epoch increment timer") + + private val syncRequests = new scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks) + + private def replyIfGetAllSyncRequest(): Unit = { +if (syncRequests.length == numTasks) { + syncRequests.foreach(_.reply(())) + syncRequests.clear() + epoch += 1 +} + } + + override def receive: PartialFunction[Any, Unit] = { +case IncreaseEpoch(previousEpoch) => + if (previousEpoch == epoch) { +syncRequests.foreach(_.sendFailure(new RuntimeException( + s"The coordinator cannot get all barrier sync requests within $timeout ms."))) +syncRequests.clear() +epoch += 1 + } + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case RequestToSync(epoch) => + if (epoch == this.epoch) { +if (syncRequests.isEmpty) { + val currentEpoch = epoch + timer.schedule(new TimerTask { +override def run(): Unit = { + // self can be null after this RPC endpoint is stopped. + if (self != null) self.send(IncreaseEpoch(currentEpoch)) --- End diff -- Once this epoch fails to sync, the stage will be failed and resubmitted. I think it will begin from new task set, so `IncreaseEpoch` seems useless because it doesn't really increase epoch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193647168 --- Diff: core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.barrier + +import java.util.{Timer, TimerTask} + +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +numTasks: Int, +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { + + private var epoch = 0 + + private val timer = new Timer("BarrierCoordinator epoch increment timer") + + private val syncRequests = new scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks) + + private def replyIfGetAllSyncRequest(): Unit = { +if (syncRequests.length == numTasks) { + syncRequests.foreach(_.reply(())) + syncRequests.clear() + epoch += 1 +} + } + + override def receive: PartialFunction[Any, Unit] = { +case IncreaseEpoch(previousEpoch) => + if (previousEpoch == epoch) { +syncRequests.foreach(_.sendFailure(new RuntimeException( + s"The coordinator cannot get all barrier sync requests within $timeout ms."))) +syncRequests.clear() +epoch += 1 + } + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { +case RequestToSync(epoch) => + if (epoch == this.epoch) { +if (syncRequests.isEmpty) { + val currentEpoch = epoch + timer.schedule(new TimerTask { +override def run(): Unit = { + // self can be null after this RPC endpoint is stopped. + if (self != null) self.send(IncreaseEpoch(currentEpoch)) +} + }, timeout) +} + +syncRequests += context +replyIfGetAllSyncRequest() + } --- End diff -- ```scala if (epoch == this.epoch) { ... } else { // Received RpcCallContext from failed previousEpoch. context.sendFailure(new RuntimeException( s"The coordinator cannot get all barrier sync requests within $timeout ms."))) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193649314 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -359,17 +368,42 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { - var launchedAnyTask = false - var launchedTaskAtCurrentMaxLocality = false - for (currentMaxLocality <- taskSet.myLocalityLevels) { -do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( -taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) - launchedAnyTask |= launchedTaskAtCurrentMaxLocality -} while (launchedTaskAtCurrentMaxLocality) - } - if (!launchedAnyTask) { -taskSet.abortIfCompletelyBlacklisted(hostToExecutors) + // Skip the barrier taskSet if the available slots are less than the number of pending tasks. + if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { +// Skip the launch process. --- End diff -- Logging something instead of silently passing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21494#discussion_r193648185 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1310,6 +1311,44 @@ class DAGScheduler( } } + case failure: TaskFailedReason if task.isBarrier => +// Always fail the current stage and retry all the tasks when a barrier task fail. +val failedStage = stageIdToStage(task.stageId) +logInfo(s"Marking $failedStage (${failedStage.name}) as failed " + + "due to a barrier task failed.") +val message = "Stage failed because a barrier task finished unsuccessfully. " + + s"${failure.toErrorString}" +try { // cancelTasks will fail if a SchedulerBackend does not implement killTask + taskScheduler.cancelTasks(stageId, interruptThread = false) +} catch { + case e: UnsupportedOperationException => +logInfo(s"Could not cancel tasks for stage $stageId", e) --- End diff -- Under barrier execution, will it be a problem if we can not cancel tasks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193659778 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark --- End diff -- @zjffdu, mind if I ask to describe this is an experimental feature and it's very likely to be unstable and it's still evolving? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193664416 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark --- End diff -- Thanks @HyukjinKwon , doc is updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #91519 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91519/testReport)** for PR 13599 at commit [`3c02852`](https://github.com/apache/spark/commit/3c0285219b45cb2e5b3b7e21f6b9b0fceb72ac62). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3830/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
GitHub user ssonker opened a pull request: https://github.com/apache/spark/pull/21505 [SPARK-24457][SQL] Improving performance of stringToTimestamp by cach⦠â¦ing Calendar instances for input timezones instead of creating new everytime ## What changes were proposed in this pull request? As of now, stringToTimestamp function in DateTimeUtils creates a calendar instance on each call. This change maintains a thread-local timezone to calendar map, and creates just one calendar for each timezone. Whenever a calendar instance is queried given a timezone, it is looked-up inside the map, reinitialized and returned. ## How was this patch tested? Using existing test cases. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ssonker/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21505.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21505 commit 84d5a911408411f327b620bb958b996e55264781 Author: Sharad Sonker Date: 2018-06-07T04:56:37Z [SPARK-24457][SQL] Improving performance of stringToTimestamp by caching Calendar instances for input timezones instead of creating new everytime --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21505 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21505 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193674578 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) --- End diff -- When you get the calendar instance next time, isn't its time out of date? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193672797 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark (This is an experimental feature and may evolve in future version) +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +first to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). User can use this feature +in 2 scenarios: +* Batch mode (submit spark app via spark-submit) +* Interactive mode (PySpark shell or other third party Spark Notebook) + +## Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +## Batch Mode + +In batch mode, user need to specify the additional python packages before launching spark app. There're 2 approaches to specify that: +* Provide a requirement file which contains all the packages for the virtualenv. +* Specify packages via spark configuration `spark.pyspark.virtualenv.packages`. + +Here're several examples: + +{% highlight bash %} +### Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=native" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.requirements=" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode and specify packages via `spark.pyspark.virtualenv.packages` +bin/spark-submit \ +--master yarn \ +--deploy-mode client \ +--conf "spark.pyspark.virtualenv.enabled=true" \ +--conf "spark.pyspark.virtualenv.type=conda" \ +--conf "spark.pyspark.virtualenv.packages=numpy,pandas" \ +--conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +### How to create requirement file ? +Usually before running distributed PySpark job, you need first to run it in local environment. It is encouraged to first create your own virtualenv for your project, so you know what packages you need. After you are confident with your work and want to move it to cluster, you can run the following command to generate the requirement file for virtualenv and conda. +- pip freeze > requirements.txt +- conda list --export > requirements.txt + +## Interactive Mode +In interactive modeï¼user can install python packages at runtime instead of specifying them in requirement file when submitting spark app. +Here are several ways to install packages + +{% highlight python %} +sc.install_packages("numpy") # install the latest numpy --- End diff -- Seems there are tabs here. Shall we replace them to spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193674619 --- Diff: python/pyspark/context.py --- @@ -1035,6 +1044,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages +""" +if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": +raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") +if isinstance(packages, basestring): +packages = [packages] +# seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. +num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 +dummyRDD = self.parallelize(range(num_executors), num_executors) + +def _run_pip(packages, iterator): +import pip +return pip.main(["install"] + packages) + +# install package on driver first. if installation succeeded, continue the installation +# on executors, otherwise return directly. +if _run_pip(packages, None) != 0: +return + +virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") +if virtualenvPackages: +self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + ":" + + ":".join(packages)) +else: +self._conf.set("spark.pyspark.virtualenv.packages", ":".join(packages)) + +import functools --- End diff -- Can we move this up within this function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193673500 --- Diff: docs/submitting-applications.md --- @@ -218,6 +218,115 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark (This is an experimental feature and may evolve in future version) +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +first to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). User can use this feature +in 2 scenarios: +* Batch mode (submit spark app via spark-submit) +* Interactive mode (PySpark shell or other third party Spark Notebook) + --- End diff -- Ah, maybe we can leave a note at the end instead of adding it in the title. ``` Note that this is an experimental feature added from Spark 2.4.0 and may evolve in the future version. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/13599#discussion_r193674158 --- Diff: python/pyspark/context.py --- @@ -1035,6 +1044,41 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf +def install_packages(self, packages): +""" +Install python packages on all executors and driver through pip. pip will be installed +by default no matter using native virtualenv or conda. So it is guaranteed that pip is +available if virtualenv is enabled. +:param packages: string for single package or a list of string for multiple packages --- End diff -- Shall we add: ``` .. versionadded:: 2.3.0 .. note:: Experimental ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193675439 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) --- End diff -- Please refer line 130 for this. Before returning the calendar instance, it is reinitialized to the time it was originally created. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193675674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) --- End diff -- @viirya ^ Does that answer you question, or you mean something else? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21499: [SPARK-24468][SQL] Handle negative scale when adj...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21499#discussion_r193675689 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala --- @@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType { * This method is used only when `spark.sql.decimalOperations.allowPrecisionLoss` is set to true. */ private[sql] def adjustPrecisionScale(precision: Int, scale: Int): DecimalType = { -// Assumptions: +// Assumption: assert(precision >= scale) -assert(scale >= 0) if (precision <= MAX_PRECISION) { // Adjustment only needed when we exceed max precision DecimalType(precision, scale) +} else if (scale < 0) { + // Decimal can have negative scale (SPARK-24468). In this case, we cannot allow a precision + // loss since we would cause a loss of digits in the integer part. --- End diff -- I am adding it, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193676413 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) --- End diff -- Isn't `timeInMillis` also stored when you first update this map entity? So next time you access this calendar, you just set it with the old `timeInMillis`. Isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193676953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) --- End diff -- Yes, correct. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193678440 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) + }) +c.clear() +c.setTimeInMillis(timeInMillis) --- End diff -- I agree with @viirya 's comment. Do we need to set the value of `System.currentTimeMillis()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193678459 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (partition_id, epoch_id)) +... return True +... d
[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21061 Let me think about the implementation to keep the order. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193679978 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) + }) +c.clear() +c.setTimeInMillis(timeInMillis) --- End diff -- @kiszk Thanks, I'm updating that. BTW, can you please help me understand a scenario where that is absolutely needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18900: [SPARK-21687][SQL] Spark SQL should set createTim...
Github user cxzl25 commented on a diff in the pull request: https://github.com/apache/spark/pull/18900#discussion_r193685282 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -1019,6 +1021,8 @@ private[hive] object HiveClientImpl { compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), + createTime = apiPartition.getCreateTime.toLong * 1000, + lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000) --- End diff -- Add a comma to the end? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193686772 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) + }) +c.clear() +c.setTimeInMillis(timeInMillis) --- End diff -- @kiszk @viirya I've updated the code. Please review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193687346 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -114,20 +114,19 @@ object DateTimeUtils { } private val threadLocalComputedCalendarsMap = -new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { - override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { -mutable.Map[TimeZone, (Calendar, Long)]() +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { + override def initialValue(): mutable.Map[TimeZone, Calendar] = { +mutable.Map[TimeZone, Calendar]() } } def getCalendar(timeZone: TimeZone): Calendar = { -val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() +val c = threadLocalComputedCalendarsMap.get() .getOrElseUpdate(timeZone, { -val c = Calendar.getInstance(timeZone) -(c, c.getTimeInMillis) +Calendar.getInstance(timeZone) }) c.clear() -c.setTimeInMillis(timeInMillis) +c.setTimeInMillis(System.currentTimeMillis()) --- End diff -- hmm, I think `System.currentTimeMillis()` is UTC-based? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193687670 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -114,20 +114,19 @@ object DateTimeUtils { } private val threadLocalComputedCalendarsMap = -new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { - override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { -mutable.Map[TimeZone, (Calendar, Long)]() +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { + override def initialValue(): mutable.Map[TimeZone, Calendar] = { +mutable.Map[TimeZone, Calendar]() } } def getCalendar(timeZone: TimeZone): Calendar = { -val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() +val c = threadLocalComputedCalendarsMap.get() .getOrElseUpdate(timeZone, { -val c = Calendar.getInstance(timeZone) -(c, c.getTimeInMillis) +Calendar.getInstance(timeZone) }) c.clear() -c.setTimeInMillis(timeInMillis) +c.setTimeInMillis(System.currentTimeMillis()) --- End diff -- oh, Calendar.getTimeInMillis and setTimeInMillis are also UTC-based. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193688565 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { + override def initialValue(): mutable.Map[TimeZone, Calendar] = { +mutable.Map[TimeZone, Calendar]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val c = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +Calendar.getInstance(timeZone) + }) +c.clear() --- End diff -- Doesn't `clear` reset the timezone of that `Calendar` instance? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21505 We would appreciate it if you put the performance before and after this PR? It would be good to use `Benchmark` class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...
Github user DazhuangSu commented on a diff in the pull request: https://github.com/apache/spark/pull/19691#discussion_r193691275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand( * * The syntax of this command is: * {{{ - * ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE]; + * ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1) + * [, PARTITION (spec2, expr2), ...] [PURGE]; * }}} */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, -specs: Seq[TablePartitionSpec], +partitions: Seq[(TablePartitionSpec, Seq[Expression])], ifExists: Boolean, purge: Boolean, retainData: Boolean) - extends RunnableCommand { + extends RunnableCommand with PredicateHelper { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) +val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") -val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( -spec, -table.partitionColumnNames, -table.identifier.quotedString, -sparkSession.sessionState.conf.resolver) +val toDrop = partitions.flatMap { partition => + if (partition._1.isEmpty && !partition._2.isEmpty) { +// There are only expressions in this drop condition. +extractFromPartitionFilter(partition._2, catalog, table, resolver) + } else if (!partition._1.isEmpty && partition._2.isEmpty) { +// There are only partitionSpecs in this drop condition. +extractFromPartitionSpec(partition._1, table, resolver) + } else if (!partition._1.isEmpty && !partition._2.isEmpty) { +// This drop condition has both partitionSpecs and expressions. +extractFromPartitionFilter(partition._2, catalog, table, resolver).intersect( --- End diff -- hi, @mgaido91 there is one problem after I changed the syntax, when i run sql `DROP PARTITION (p >=2)` it throws `org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input columns: []` I'm trying to find a way to figure it out. By the way, is a syntax like `((partitionVal (',' partitionVal)*) | (expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, but it didn't work as I supposed. Besides, I was wrong that day. I think the if conditions won't be inefficient if there is a lot of partitions. it maybe inefficient if there are a lot of dropPartitionSpec which I don't think can happen easily. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user ssonker commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193694372 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { + override def initialValue(): mutable.Map[TimeZone, Calendar] = { +mutable.Map[TimeZone, Calendar]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val c = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +Calendar.getInstance(timeZone) + }) +c.clear() --- End diff -- Nope. It clears all the ```fields``` and zone is not a field. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21499 **[Test build #91520 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91520/testReport)** for PR 21499 at commit [`7f24206`](https://github.com/apache/spark/commit/7f242064ade10c02733e29a132aec5fc9af9b887). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193696451 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { --- End diff -- Do we need to keep Calendar for many timezone? Since `getCalendar` takes a time zone input, we can just keep one Calendar instance, and set it with given timezone in `getCalendar`. WDYT? Regarding performance, is there big difference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21499 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3831/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21499 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21502#discussion_r193724774 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-22575: remove allocated blocks when they are not needed anymore") { +val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value") +val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") +val df3 = df1.join(broadcast(df2), Seq("key"), "inner") +val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { + case b: BroadcastHashJoinExec => b +}.size +assert(numBroadCastHashJoin > 0) +df3.collect() +df3.destroy() +val blockManager = sparkContext.env.blockManager +val blocks = blockManager.getMatchingBlockIds(blockId => { + blockId.isBroadcast && blockManager.getStatus(blockId).get.storageLevel.deserialized +}).distinct +val blockValues = blocks.flatMap { id => + blockManager.getSingle[Any](id) +} --- End diff -- Here maybe the root cause for the unstable UT failure and the block can't be deleted soon, I added a sleep and can pass every times, you can have a try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21467 **[Test build #91521 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91521/testReport)** for PR 21467 at commit [`9724640`](https://github.com/apache/spark/commit/9724640c534f3f1600ae3c37988479e7d0500cd0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18900: [SPARK-21687][SQL] Spark SQL should set createTim...
Github user debugger87 commented on a diff in the pull request: https://github.com/apache/spark/pull/18900#discussion_r193730957 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala --- @@ -1019,6 +1021,8 @@ private[hive] object HiveClientImpl { compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), + createTime = apiPartition.getCreateTime.toLong * 1000, + lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000) --- End diff -- @cxzl25 yeah, it's my mistake, i will fix it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18900: [SPARK-21687][SQL] Spark SQL should set createTime for H...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18900 **[Test build #91522 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91522/testReport)** for PR 18900 at commit [`b0846c3`](https://github.com/apache/spark/commit/b0846c39a94d729ec0324cc72b98861da7c073c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21469 **[Test build #91523 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91523/testReport)** for PR 21469 at commit [`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21467 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91521/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21467 **[Test build #91521 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91521/testReport)** for PR 21467 at commit [`9724640`](https://github.com/apache/spark/commit/9724640c534f3f1600ae3c37988479e7d0500cd0). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21467 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #91519 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91519/testReport)** for PR 13599 at commit [`3c02852`](https://github.com/apache/spark/commit/3c0285219b45cb2e5b3b7e21f6b9b0fceb72ac62). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean)` * ` class DriverEndpoint(override val rpcEnv: RpcEnv)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91519/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193740695 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (partition_id, epoch_id)) +... return True +... d
[GitHub] spark pull request #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21502#discussion_r193742604 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-22575: remove allocated blocks when they are not needed anymore") { +val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value") +val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") +val df3 = df1.join(broadcast(df2), Seq("key"), "inner") +val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect { + case b: BroadcastHashJoinExec => b +}.size +assert(numBroadCastHashJoin > 0) +df3.collect() +df3.destroy() +val blockManager = sparkContext.env.blockManager +val blocks = blockManager.getMatchingBlockIds(blockId => { + blockId.isBroadcast && blockManager.getStatus(blockId).get.storageLevel.deserialized +}).distinct +val blockValues = blocks.flatMap { id => + blockManager.getSingle[Any](id) +} --- End diff -- I run the test 1 times and I cannot reproduce the issue locally. Can you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193736960 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala --- @@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { } def makeSortMergeJoin( -leftKeys: Seq[Expression], -rightKeys: Seq[Expression], -boundCondition: Option[Expression], -leftPlan: SparkPlan, -rightPlan: SparkPlan) = { - val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, -leftPlan, rightPlan) + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + boundCondition: Option[Expression], + rangeConditions: Seq[BinaryComparison], + leftPlan: SparkPlan, + rightPlan: SparkPlan) = { + val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, rangeConditions, +boundCondition, leftPlan, rightPlan) EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin) } -test(s"$testName using BroadcastHashJoin (build=left)") { - extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) => -withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { - checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => -makeBroadcastHashJoin( - leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft), -expectedAnswer.map(Row.fromTuple), -sortAnswers = true) +val configOptions = List( + ("spark.sql.codegen.wholeStage", "true"), + ("spark.sql.codegen.wholeStage", "false")) + +// Disabling these because the code would never follow this path in case of a inner range join +if (!expectRangeJoin) { + var counter = 1 --- End diff -- If you want to avoid a `var`, just `configOptions.zipWithIndex.foreach { case ((config, confValue), counter) =>`. Just a tiny bit more idiomatic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193734550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty +var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { +otherPredicates.flatMap { + case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); GreaterThan(r, l) +case false => rangePreds.add(p); p + } + case p@LessThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); LessThan(r, l) +case false => rangePreds.add(p); p + } + case p@GreaterThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case _ => None +} + } else { +Nil + } + +// Only using secondary join optimization when both lower and upper conditions +// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) +if(rangeConditions.size != 2 || +// Looking for one < and one > comparison: +rangeConditions.filter(x => x.isInstanceOf[LessThan] || --- End diff -- Instead of checking `.size == 0`, something like `rangeConditions.forall(... not instance of either ...)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193735061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty --- End diff -- I tend to prefer `val rangePreds = mutable.Set.empty[Expression]` as it's shorter, but that's just taste --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193735681 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty +var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { +otherPredicates.flatMap { + case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); GreaterThan(r, l) +case false => rangePreds.add(p); p + } + case p@LessThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); LessThan(r, l) +case false => rangePreds.add(p); p + } + case p@GreaterThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case _ => None +} + } else { +Nil + } + +// Only using secondary join optimization when both lower and upper conditions +// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) +if(rangeConditions.size != 2 || +// Looking for one < and one > comparison: +rangeConditions.filter(x => x.isInstanceOf[LessThan] || + x.isInstanceOf[LessThanOrEqual]).size == 0 || +rangeConditions.filter(x => x.isInstanceOf[GreaterThan] || + x.isInstanceOf[GreaterThanOrEqual]).size == 0 || +// Check if both comparisons reference the same columns: +rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 || +rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) { + logDebug("Inner range optimization conditions not met. Clearing range conditions") + rangeConditions = Nil + rangePreds.clear() +} + +Some((joinType, leftKeys, rightKeys, rangeConditions, + otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right)) } else { None } case _ => None } + + /** + * Checks if l and r are valid range conditions: + * - l and r expressions should both contain a single reference to one and the same column. + * - the referenced column should not be part of joinKeys + * If these conditions are not met, the function returns None. + * + * Otherwise, the function checks if the left plan contains l expression and the right plan + * contains r expression. If the expressions need to be switched, the function returns Some(true) + * and Some(false) otherwise. + */ + private def checkRangeConditions(l : Expression, r : Expression, + left : LogicalPlan, right : LogicalPlan, + joinKeys : Seq[(Expression, Expression)]) = { +val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq) +if(lattrs.size != 1 || rattrs.size != 1) { + None +} +else if (canEvaluate(l, left) && canEvaluate(r, right)) { --- End diff -- Nit: pull else onto previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193733146 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty +var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { +otherPredicates.flatMap { + case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); GreaterThan(r, l) +case false => rangePreds.add(p); p + } + case p@LessThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); LessThan(r, l) +case false => rangePreds.add(p); p + } + case p@GreaterThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case _ => None +} + } else { +Nil + } + +// Only using secondary join optimization when both lower and upper conditions +// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) +if(rangeConditions.size != 2 || --- End diff -- Nit: space after "if" here and elsewhere --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193743605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1205,6 +1205,19 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_SMJ_INNER_RANGE_OPTIMIZATION = --- End diff -- Yes, at best make this internal. Are there conditions where you would not want to apply this? is it just a safety valve? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193736438 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala --- @@ -70,27 +70,41 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { (3, 2) ).toDF("a", "b") + private lazy val rangeTestData1 = Seq( +(1, 3), (1, 4), (1, 7), (1, 8), (1, 10), +(2, 1), (2, 2), (2, 3), (2, 8), +(3, 1), (3, 2), (3, 3), (3, 5), +(4, 1), (4, 2), (4, 3) + ).toDF("a", "b") + + private lazy val rangeTestData2 = Seq( +(1, 1), (1, 2), (1, 2), (1, 3), (1, 5), (1, 7), (1, 20), +(2, 1), (2, 2), (2, 3), (2, 5), (2, 6), +(3, 3), (3, 6) + ).toDF("a", "b") + // Note: the input dataframes and expression must be evaluated lazily because // the SQLContext should be used only within a test to keep SQL tests stable private def testInnerJoin( - testName: String, - leftRows: => DataFrame, - rightRows: => DataFrame, - condition: () => Expression, - expectedAnswer: Seq[Product]): Unit = { + testName: String, + leftRows: => DataFrame, + rightRows: => DataFrame, + condition: () => Expression, + expectedAnswer: Seq[Product], + expectRangeJoin: Boolean = false): Unit = { def extractJoinParts(): Option[ExtractEquiJoinKeys.ReturnType] = { val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, Some(condition())) ExtractEquiJoinKeys.unapply(join) } def makeBroadcastHashJoin( -leftKeys: Seq[Expression], -rightKeys: Seq[Expression], -boundCondition: Option[Expression], -leftPlan: SparkPlan, -rightPlan: SparkPlan, -side: BuildSide) = { + leftKeys: Seq[Expression], --- End diff -- (Undo this whitespace change and the next one) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193737191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager + +/** + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: + * + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class InMemoryUnsafeRowQueue( --- End diff -- No way to avoid making a custom queue implementation here? is it messier without such a structure? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193735968 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty +var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { +otherPredicates.flatMap { + case p@LessThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); GreaterThan(r, l) +case false => rangePreds.add(p); p + } + case p@LessThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); GreaterThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, right, joinKeys).map { +case true => rangePreds.add(p); LessThan(r, l) +case false => rangePreds.add(p); p + } + case p@GreaterThanOrEqual(l, r) => +checkRangeConditions(l, r, left, right, joinKeys).map { + case true => rangePreds.add(p); LessThanOrEqual(r, l) + case false => rangePreds.add(p); p +} + case _ => None +} + } else { +Nil + } + +// Only using secondary join optimization when both lower and upper conditions +// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) +if(rangeConditions.size != 2 || +// Looking for one < and one > comparison: +rangeConditions.filter(x => x.isInstanceOf[LessThan] || + x.isInstanceOf[LessThanOrEqual]).size == 0 || +rangeConditions.filter(x => x.isInstanceOf[GreaterThan] || + x.isInstanceOf[GreaterThanOrEqual]).size == 0 || +// Check if both comparisons reference the same columns: +rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 || +rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) { + logDebug("Inner range optimization conditions not met. Clearing range conditions") + rangeConditions = Nil + rangePreds.clear() +} + +Some((joinType, leftKeys, rightKeys, rangeConditions, + otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right)) } else { None } case _ => None } + + /** + * Checks if l and r are valid range conditions: + * - l and r expressions should both contain a single reference to one and the same column. + * - the referenced column should not be part of joinKeys + * If these conditions are not met, the function returns None. + * + * Otherwise, the function checks if the left plan contains l expression and the right plan + * contains r expression. If the expressions need to be switched, the function returns Some(true) + * and Some(false) otherwise. + */ + private def checkRangeConditions(l : Expression, r : Expression, + left : LogicalPlan, right : LogicalPlan, + joinKeys : Seq[(Expression, Expression)]) = { --- End diff -- For clarity add a return type to this method. `Option[Boolean]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user zecevicp commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193751918 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1205,6 +1205,19 @@ object SQLConf { .booleanConf .createWithDefault(true) + val USE_SMJ_INNER_RANGE_OPTIMIZATION = --- End diff -- It's just a safety valve. In case there are some queries that I don't foresee now where this could get in the way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user zecevicp commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193753823 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager + +/** + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: + * + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class InMemoryUnsafeRowQueue( --- End diff -- A queue is needed here because it's a moving window instead of a fixed block of rows. Maybe I missed an existing class that could do this easily so I'll take another look. But, I believe any alternative would indeed be messier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user zecevicp commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193753965 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala --- @@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { } def makeSortMergeJoin( -leftKeys: Seq[Expression], -rightKeys: Seq[Expression], -boundCondition: Option[Expression], -leftPlan: SparkPlan, -rightPlan: SparkPlan) = { - val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, -leftPlan, rightPlan) + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + boundCondition: Option[Expression], + rangeConditions: Seq[BinaryComparison], + leftPlan: SparkPlan, + rightPlan: SparkPlan) = { + val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, rangeConditions, +boundCondition, leftPlan, rightPlan) EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin) } -test(s"$testName using BroadcastHashJoin (build=left)") { - extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) => -withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { - checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => -makeBroadcastHashJoin( - leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, joins.BuildLeft), -expectedAnswer.map(Row.fromTuple), -sortAnswers = true) +val configOptions = List( + ("spark.sql.codegen.wholeStage", "true"), + ("spark.sql.codegen.wholeStage", "false")) + +// Disabling these because the code would never follow this path in case of a inner range join +if (!expectRangeJoin) { + var counter = 1 --- End diff -- OK, will do that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user zecevicp commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193754271 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip -logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") -Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) +// Find any simple range expressions between two columns +// (and involving only those two columns) of the two tables being joined, +// which are not used in the equijoin expressions, +// and which can be used for secondary sort optimizations. +// rangePreds will contain the original expressions to be filtered out later. +val rangePreds: mutable.Set[Expression] = mutable.Set.empty --- End diff -- I think you're right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21499 **[Test build #91520 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91520/testReport)** for PR 21499 at commit [`7f24206`](https://github.com/apache/spark/commit/7f242064ade10c02733e29a132aec5fc9af9b887). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21499 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91520/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21499 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user DylanGuedes commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r193759057 --- Diff: python/pyspark/sql/functions.py --- @@ -2394,6 +2394,23 @@ def array_repeat(col, count): return Column(sc._jvm.functions.array_repeat(_to_java_column(col), count)) +@since(2.4) +def zip(*cols): +""" +Collection function: Merge two columns into one, such that the M-th element of the N-th +argument will be the N-th field of the M-th output element. + +:param cols: columns in input + +>>> from pyspark.sql.functions import zip as spark_zip --- End diff -- I think that we should stick with something related to zip (such as "zip_arrays" or "zip_lists") for "compatibility naming" with other APIs/languages (`Enum.zip` in Elixir and `zip` in Scala, for instance). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21502 **[Test build #91524 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91524/testReport)** for PR 21502 at commit [`789168e`](https://github.com/apache/spark/commit/789168e147615c50cfd67ba959ba1d43afb00ccf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21502 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91524/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21502 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21502 **[Test build #91524 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91524/testReport)** for PR 21502 at commit [`789168e`](https://github.com/apache/spark/commit/789168e147615c50cfd67ba959ba1d43afb00ccf). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193762830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager + +/** + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: + * + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class InMemoryUnsafeRowQueue( +taskMemoryManager: TaskMemoryManager, +blockManager: BlockManager, +serializerManager: SerializerManager, +taskContext: TaskContext, +initialSize: Int, +pageSizeBytes: Long, +numRowsInMemoryBufferThreshold: Int, +numRowsSpillThreshold: Int) + extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager, + blockManager, + serializerManager, + taskContext, + initialSize, + pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) { + + def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) { +this( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) + } + + private val initialSizeOfInMemoryBuffer = +Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold) + + private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) { +new mutable.Queue[UnsafeRow]() + } else { +null + } + +// private var spillableArray: UnsafeExternalSorter = _ --- End diff -- nit: Is this comment necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user zecevicp commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193763364 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager + +/** + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: + * + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class InMemoryUnsafeRowQueue( +taskMemoryManager: TaskMemoryManager, +blockManager: BlockManager, +serializerManager: SerializerManager, +taskContext: TaskContext, +initialSize: Int, +pageSizeBytes: Long, +numRowsInMemoryBufferThreshold: Int, +numRowsSpillThreshold: Int) + extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager, + blockManager, + serializerManager, + taskContext, + initialSize, + pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) { + + def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) { +this( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) + } + + private val initialSizeOfInMemoryBuffer = +Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold) + + private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) { +new mutable.Queue[UnsafeRow]() + } else { +null + } + +// private var spillableArray: UnsafeExternalSorter = _ --- End diff -- No, it's not. Thank you --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21502 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21502 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3832/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21506 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21506.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21506 commit d84f98fc978262f4165f78b3b223b8bb3151f735 Author: Jungtaek Lim Date: 2018-06-07T14:14:46Z [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21506 **[Test build #91525 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91525/testReport)** for PR 21506 at commit [`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 There're plenty of other debug messages which might hide the log messages added from this patch. Would we want to log them with INFO instead of DEBUG? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21469 **[Test build #91523 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91523/testReport)** for PR 21469 at commit [`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21469 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21469 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91523/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21506 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21469 **[Test build #91526 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91526/testReport)** for PR 21469 at commit [`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21501: [SPARK-15064][ML] Locale support in StopWordsRemo...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21501#discussion_r193779131 --- Diff: python/pyspark/ml/feature.py --- @@ -2582,25 +2582,27 @@ class StopWordsRemover(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadabl typeConverter=TypeConverters.toListString) caseSensitive = Param(Params._dummy(), "caseSensitive", "whether to do a case sensitive " + "comparison over the stop words", typeConverter=TypeConverters.toBoolean) +locale = Param(Params._dummy(), "locale", "locale of the input. ignored when case sensitive is false", --- End diff -- `false` -> `true`. (Copy the param doc from Scala) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21501: [SPARK-15064][ML] Locale support in StopWordsRemo...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21501#discussion_r193777474 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala --- @@ -84,7 +86,31 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String @Since("1.5.0") def getCaseSensitive: Boolean = $(caseSensitive) - setDefault(stopWords -> StopWordsRemover.loadDefaultStopWords("english"), caseSensitive -> false) + /** + * [[https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html Locale]] of the input for case insensitive + * matching. Ignored when [[caseSensitive]] is true. + * Default: Locale.getDefault.toString + * @see `StopWordsRemover.loadDefaultStopWords()` + * @group param + */ + @Since("2.4.0") + val locale: Param[String] = new Param[String](this, "locale", +"Locale of the input for case insensitive matching. Ignored when caseSensitive is false.", --- End diff -- `false` -> `true` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org