[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91518/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21482
  
**[Test build #91518 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91518/testReport)**
 for PR 21482 at commit 
[`6a4d46e`](https://github.com/apache/spark/commit/6a4d46e0a9ab403364e26a7b8f16c9ca94c31a2e).
 * This patch **fails due to an unknown error code, -9**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21482
  
**[Test build #91517 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91517/testReport)**
 for PR 21482 at commit 
[`f240fdf`](https://github.com/apache/spark/commit/f240fdf3a410e2fdec1fa668bc0218ac61078423).
 * This patch **fails due to an unknown error code, -9**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91517/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21482: [SPARK-24393][SQL] SQL builtin: isinf

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21482
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread kokes
Github user kokes commented on the issue:

https://github.com/apache/spark/pull/13599
  
Hi, thanks for all the work on this! I see requirements.txt mentioned here 
and there and, browsing this and other JIRAs, it seems to be the proposed way 
to specify dependencies in PySpark. As you probably know, the community has 
rallied around [Pipfile](https://github.com/pypa/pipfile)s as a replacement for 
requirements.txt.

This has a few upsides (including a lock file), the main one being that the 
reference implementation ([Pipenv](http://pipenv.org/)) allows for installing 
packages into a new virtualenv directly, without having to activate it or run 
other commands. So that combines dependency management, reproducibility, and 
environment isolation.

(Also, if one doesn't want said packages to be installed in a venv, there's 
an argument to install them system-wide.)

I'm not proposing this PR gets extended to support Pipfiles, I just wanted 
to ask if this has been considered and is on the roadmap, since it seems to be 
the successor to requirements.txt.

(We stumbled upon this as we were thinking of moving to Kubernetes and 
didn't know how dependencies were handled there [they aren't, yet, see #21092]. 
We could install dependencies in our target Docker images using Pipfiles, but 
submitting a Pipfile with our individual jobs would be a much cleaner solution.)

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread zjffdu
Github user zjffdu commented on the issue:

https://github.com/apache/spark/pull/13599
  
Thanks for the interest on this PR and the info about `Pipfiles`. I think 
we could support that after this PR get merged so that we can provide users 
more options for virtualenv based on their enviroment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/13599
  
@holdenk and @zjffdu, I believe manual tests are a-okay if it's difficult 
to write a test. We can manually test and expose this as an experimental 
feature too.

BTW, I believe we can still have some tests to check if, for example, at 
least the string is properly constructed - 
https://github.com/apache/spark/pull/13599/files#r175670974? I think that could 
be enough for now. Somehow I happened to look into this multiple times over few 
years and I think it's better to go ahead than just blocking here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/13599
  
@JoshRosen, I roughly heard that you took a look about this before. Do you 
have a concern to address maybe?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193640783
  
--- Diff: 
core/src/main/scala/org/apache/spark/barrier/BarrierTaskContext.scala ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.barrier
+
+import java.util.Properties
+
+import org.apache.spark.{SparkEnv, TaskContextImpl}
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.util.RpcUtils
+
+class BarrierTaskContext(
--- End diff --

BarrierTaskContextImpl?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193644506
  
--- Diff: 
core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.barrier
+
+import java.util.{Timer, TimerTask}
+
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+
+class BarrierCoordinator(
+numTasks: Int,
+timeout: Long,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+
+  private var epoch = 0
+
+  private val timer = new Timer("BarrierCoordinator epoch increment timer")
+
+  private val syncRequests = new 
scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks)
+
+  private def replyIfGetAllSyncRequest(): Unit = {
+if (syncRequests.length == numTasks) {
+  syncRequests.foreach(_.reply(()))
+  syncRequests.clear()
+  epoch += 1
+}
+  }
+
+  override def receive: PartialFunction[Any, Unit] = {
+case IncreaseEpoch(previousEpoch) =>
+  if (previousEpoch == epoch) {
+syncRequests.foreach(_.sendFailure(new RuntimeException(
+  s"The coordinator cannot get all barrier sync requests within 
$timeout ms.")))
--- End diff --

Have we considered to increase incrementally the time out when we can't get 
all barrier sync requests at an epoch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193658009
  
--- Diff: 
core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.barrier
+
+import java.util.{Timer, TimerTask}
+
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+
+class BarrierCoordinator(
+numTasks: Int,
+timeout: Long,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+
+  private var epoch = 0
+
+  private val timer = new Timer("BarrierCoordinator epoch increment timer")
+
+  private val syncRequests = new 
scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks)
+
+  private def replyIfGetAllSyncRequest(): Unit = {
+if (syncRequests.length == numTasks) {
+  syncRequests.foreach(_.reply(()))
+  syncRequests.clear()
+  epoch += 1
+}
+  }
+
+  override def receive: PartialFunction[Any, Unit] = {
+case IncreaseEpoch(previousEpoch) =>
+  if (previousEpoch == epoch) {
+syncRequests.foreach(_.sendFailure(new RuntimeException(
+  s"The coordinator cannot get all barrier sync requests within 
$timeout ms.")))
+syncRequests.clear()
+epoch += 1
+  }
+  }
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case RequestToSync(epoch) =>
+  if (epoch == this.epoch) {
+if (syncRequests.isEmpty) {
+  val currentEpoch = epoch
+  timer.schedule(new TimerTask {
+override def run(): Unit = {
+  // self can be null after this RPC endpoint is stopped.
+  if (self != null) self.send(IncreaseEpoch(currentEpoch))
--- End diff --

Once this epoch fails to sync, the stage will be failed and resubmitted. I 
think it will begin from new task set, so `IncreaseEpoch` seems useless because 
it doesn't really increase epoch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193647168
  
--- Diff: 
core/src/main/scala/org/apache/spark/barrier/BarrierCoordinator.scala ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.barrier
+
+import java.util.{Timer, TimerTask}
+
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+
+class BarrierCoordinator(
+numTasks: Int,
+timeout: Long,
+override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+
+  private var epoch = 0
+
+  private val timer = new Timer("BarrierCoordinator epoch increment timer")
+
+  private val syncRequests = new 
scala.collection.mutable.ArrayBuffer[RpcCallContext](numTasks)
+
+  private def replyIfGetAllSyncRequest(): Unit = {
+if (syncRequests.length == numTasks) {
+  syncRequests.foreach(_.reply(()))
+  syncRequests.clear()
+  epoch += 1
+}
+  }
+
+  override def receive: PartialFunction[Any, Unit] = {
+case IncreaseEpoch(previousEpoch) =>
+  if (previousEpoch == epoch) {
+syncRequests.foreach(_.sendFailure(new RuntimeException(
+  s"The coordinator cannot get all barrier sync requests within 
$timeout ms.")))
+syncRequests.clear()
+epoch += 1
+  }
+  }
+
+  override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+case RequestToSync(epoch) =>
+  if (epoch == this.epoch) {
+if (syncRequests.isEmpty) {
+  val currentEpoch = epoch
+  timer.schedule(new TimerTask {
+override def run(): Unit = {
+  // self can be null after this RPC endpoint is stopped.
+  if (self != null) self.send(IncreaseEpoch(currentEpoch))
+}
+  }, timeout)
+}
+
+syncRequests += context
+replyIfGetAllSyncRequest()
+  }
--- End diff --

```scala
if (epoch == this.epoch) {
 ...
} else { // Received RpcCallContext from failed previousEpoch.
  context.sendFailure(new RuntimeException(
s"The coordinator cannot get all barrier sync requests within $timeout 
ms.")))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193649314
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -359,17 +368,42 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 for (taskSet <- sortedTaskSets) {
-  var launchedAnyTask = false
-  var launchedTaskAtCurrentMaxLocality = false
-  for (currentMaxLocality <- taskSet.myLocalityLevels) {
-do {
-  launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
-taskSet, currentMaxLocality, shuffledOffers, availableCpus, 
tasks)
-  launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-} while (launchedTaskAtCurrentMaxLocality)
-  }
-  if (!launchedAnyTask) {
-taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
+  // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
+  if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+// Skip the launch process.
--- End diff --

Logging something instead of silently passing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21494: [WIP][SPARK-24375][Prototype] Support barrier sch...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21494#discussion_r193648185
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1310,6 +1311,44 @@ class DAGScheduler(
 }
 }
 
+  case failure: TaskFailedReason if task.isBarrier =>
+// Always fail the current stage and retry all the tasks when a 
barrier task fail.
+val failedStage = stageIdToStage(task.stageId)
+logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
+  "due to a barrier task failed.")
+val message = "Stage failed because a barrier task finished 
unsuccessfully. " +
+  s"${failure.toErrorString}"
+try { // cancelTasks will fail if a SchedulerBackend does not 
implement killTask
+  taskScheduler.cancelTasks(stageId, interruptThread = false)
+} catch {
+  case e: UnsupportedOperationException =>
+logInfo(s"Could not cancel tasks for stage $stageId", e)
--- End diff --

Under barrier execution, will it be a problem if we can not cancel tasks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193659778
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark
--- End diff --

@zjffdu, mind if I ask to describe this is an experimental feature and it's 
very likely to be unstable and it's still evolving?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193664416
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark
--- End diff --

Thanks @HyukjinKwon , doc is updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/13599
  
**[Test build #91519 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91519/testReport)**
 for PR 13599 at commit 
[`3c02852`](https://github.com/apache/spark/commit/3c0285219b45cb2e5b3b7e21f6b9b0fceb72ac62).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/13599
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/13599
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3830/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
GitHub user ssonker opened a pull request:

https://github.com/apache/spark/pull/21505

[SPARK-24457][SQL] Improving performance of stringToTimestamp by cach…

…ing Calendar instances for input timezones instead of creating new 
everytime

## What changes were proposed in this pull request?

As of now, stringToTimestamp function in DateTimeUtils creates a calendar 
instance on each call. This change maintains a thread-local timezone to 
calendar map, and creates just one calendar for each timezone. Whenever a 
calendar instance is queried given a timezone, it is looked-up inside the map, 
reinitialized and returned.

## How was this patch tested?

Using existing test cases.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ssonker/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21505


commit 84d5a911408411f327b620bb958b996e55264781
Author: Sharad Sonker 
Date:   2018-06-07T04:56:37Z

[SPARK-24457][SQL] Improving performance of stringToTimestamp by caching 
Calendar instances for input timezones instead of creating new everytime




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21505
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21505
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193674578
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
--- End diff --

When you get the calendar instance next time, isn't its time out of date?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193672797
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark (This is an experimental feature and may evolve 
in future version)
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+first to be installed. In this case `--py-files` is not so convenient. 
Luckily, in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). User can use this feature
+in 2 scenarios:
+* Batch mode (submit spark app via spark-submit)
+* Interactive mode (PySpark shell or other third party Spark Notebook)
+
+## Prerequisites
+- Each node have virtualenv/conda, python-devel installed
+- Each node is internet accessible (for downloading packages)
+
+## Batch Mode
+
+In batch mode, user need to specify the additional python packages before 
launching spark app. There're 2 approaches to specify that:
+* Provide a requirement file which contains all the packages for the 
virtualenv.  
+* Specify packages via spark configuration 
`spark.pyspark.virtualenv.packages`.
+
+Here're several examples:
+
+{% highlight bash %}
+### Setup virtualenv using native virtualenv on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=native" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+### Setup virtualenv using conda on yarn-client mode
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf 
"spark.pyspark.virtualenv.requirements=" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+
+### Setup virtualenv using conda on yarn-client mode and specify packages 
via `spark.pyspark.virtualenv.packages`
+bin/spark-submit \
+--master yarn \
+--deploy-mode client \
+--conf "spark.pyspark.virtualenv.enabled=true" \
+--conf "spark.pyspark.virtualenv.type=conda" \
+--conf "spark.pyspark.virtualenv.packages=numpy,pandas" \
+--conf "spark.pyspark.virtualenv.bin.path=" \
+
+{% endhighlight %}
+
+### How to create requirement file ?
+Usually before running distributed PySpark job, you need first to run it 
in local environment. It is encouraged to first create your own virtualenv for 
your project, so you know what packages you need. After you are confident with 
your work and want to move it to cluster, you can run the following command to 
generate the requirement file for virtualenv and conda.
+- pip freeze > requirements.txt
+- conda list --export  > requirements.txt
+
+## Interactive Mode
+In interactive mode,user can install python packages at runtime instead 
of specifying them in requirement file when submitting spark app.
+Here are several ways to install packages
+
+{% highlight python %}
+sc.install_packages("numpy")   # install the latest numpy
--- End diff --

Seems there are tabs here. Shall we replace them to spaces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193674619
  
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
+"""
+if self._conf.get("spark.pyspark.virtualenv.enabled") != "true":
+raise RuntimeError("install_packages can only use called when "
+   "spark.pyspark.virtualenv.enabled is set as 
true")
+if isinstance(packages, basestring):
+packages = [packages]
+# seems statusTracker.getExecutorInfos() will return driver + 
exeuctors, so -1 here.
+num_executors = 
len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1
+dummyRDD = self.parallelize(range(num_executors), num_executors)
+
+def _run_pip(packages, iterator):
+import pip
+return pip.main(["install"] + packages)
+
+# install package on driver first. if installation succeeded, 
continue the installation
+# on executors, otherwise return directly.
+if _run_pip(packages, None) != 0:
+return
+
+virtualenvPackages = 
self._conf.get("spark.pyspark.virtualenv.packages")
+if virtualenvPackages:
+self._conf.set("spark.pyspark.virtualenv.packages", 
virtualenvPackages + ":" +
+   ":".join(packages))
+else:
+self._conf.set("spark.pyspark.virtualenv.packages", 
":".join(packages))
+
+import functools
--- End diff --

Can we move this up within this function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193673500
  
--- Diff: docs/submitting-applications.md ---
@@ -218,6 +218,115 @@ These commands can be used with `pyspark`, 
`spark-shell`, and `spark-submit` to
 For Python, the equivalent `--py-files` option can be used to distribute 
`.egg`, `.zip` and `.py` libraries
 to executors.
 
+# VirtualEnv for PySpark (This is an experimental feature and may evolve 
in future version)
+For simple PySpark application, we can use `--py-files` to add its 
dependencies. While for a large PySpark application,
+usually you will have many dependencies which may also have transitive 
dependencies and even some dependencies need to be compiled
+first to be installed. In this case `--py-files` is not so convenient. 
Luckily, in python world we have virtualenv/conda to help create isolated
+python work environment. We also implement virtualenv in PySpark (It is 
only supported in yarn mode for now). User can use this feature
+in 2 scenarios:
+* Batch mode (submit spark app via spark-submit)
+* Interactive mode (PySpark shell or other third party Spark Notebook)
+
--- End diff --

Ah, maybe we can leave a note at the end instead of adding it in the title.

```
Note that this is an experimental feature added from Spark 2.4.0 and may 
evolve in the future version.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pys...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/13599#discussion_r193674158
  
--- Diff: python/pyspark/context.py ---
@@ -1035,6 +1044,41 @@ def getConf(self):
 conf.setAll(self._conf.getAll())
 return conf
 
+def install_packages(self, packages):
+"""
+Install python packages on all executors and driver through pip. 
pip will be installed
+by default no matter using native virtualenv or conda. So it is 
guaranteed that pip is
+available if virtualenv is enabled.
+:param packages: string for single package or a list of string for 
multiple packages
--- End diff --

Shall we add:

```
.. versionadded:: 2.3.0
.. note:: Experimental
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193675439
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
--- End diff --

Please refer line 130 for this. Before returning the calendar instance, it 
is reinitialized to the time it was originally created.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193675674
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
--- End diff --

@viirya ^ Does that answer you question, or you mean something else?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21499: [SPARK-24468][SQL] Handle negative scale when adj...

2018-06-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21499#discussion_r193675689
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala ---
@@ -161,13 +161,17 @@ object DecimalType extends AbstractDataType {
* This method is used only when 
`spark.sql.decimalOperations.allowPrecisionLoss` is set to true.
*/
   private[sql] def adjustPrecisionScale(precision: Int, scale: Int): 
DecimalType = {
-// Assumptions:
+// Assumption:
 assert(precision >= scale)
-assert(scale >= 0)
 
 if (precision <= MAX_PRECISION) {
   // Adjustment only needed when we exceed max precision
   DecimalType(precision, scale)
+} else if (scale < 0) {
+  // Decimal can have negative scale (SPARK-24468). In this case, we 
cannot allow a precision
+  // loss since we would cause a loss of digits in the integer part.
--- End diff --

I am adding it, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193676413
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
--- End diff --

Isn't `timeInMillis` also stored when you first update this map entity? So 
next time you access this calendar, you just set it with the old 
`timeInMillis`. Isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193676953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
--- End diff --

Yes, correct.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193678440
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
+  })
+c.clear()
+c.setTimeInMillis(timeInMillis)
--- End diff --

I agree with @viirya 's comment. Do we need to set the value of 
`System.currentTimeMillis()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193678459
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... d

[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21061
  
Let me think about the implementation to keep the order.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193679978
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
+  })
+c.clear()
+c.setTimeInMillis(timeInMillis)
--- End diff --

@kiszk Thanks, I'm updating that. BTW, can you please help me understand a 
scenario where that is absolutely needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18900: [SPARK-21687][SQL] Spark SQL should set createTim...

2018-06-07 Thread cxzl25
Github user cxzl25 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18900#discussion_r193685282
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -1019,6 +1021,8 @@ private[hive] object HiveClientImpl {
 compressed = apiPartition.getSd.isCompressed,
 properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
   .map(_.asScala.toMap).orNull),
+  createTime = apiPartition.getCreateTime.toLong * 1000,
+  lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000)
--- End diff --

Add a comma to the end?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193686772
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
+  })
+c.clear()
+c.setTimeInMillis(timeInMillis)
--- End diff --

@kiszk @viirya I've updated the code. Please review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193687346
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -114,20 +114,19 @@ object DateTimeUtils {
   }
 
   private val threadLocalComputedCalendarsMap =
-new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
-  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
-mutable.Map[TimeZone, (Calendar, Long)]()
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
+  override def initialValue(): mutable.Map[TimeZone, Calendar] = {
+mutable.Map[TimeZone, Calendar]()
   }
 }
 
   def getCalendar(timeZone: TimeZone): Calendar = {
-val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+val c = threadLocalComputedCalendarsMap.get()
   .getOrElseUpdate(timeZone, {
-val c = Calendar.getInstance(timeZone)
-(c, c.getTimeInMillis)
+Calendar.getInstance(timeZone)
   })
 c.clear()
-c.setTimeInMillis(timeInMillis)
+c.setTimeInMillis(System.currentTimeMillis())
--- End diff --

hmm, I think `System.currentTimeMillis()`  is UTC-based?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193687670
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -114,20 +114,19 @@ object DateTimeUtils {
   }
 
   private val threadLocalComputedCalendarsMap =
-new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
-  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
-mutable.Map[TimeZone, (Calendar, Long)]()
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
+  override def initialValue(): mutable.Map[TimeZone, Calendar] = {
+mutable.Map[TimeZone, Calendar]()
   }
 }
 
   def getCalendar(timeZone: TimeZone): Calendar = {
-val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+val c = threadLocalComputedCalendarsMap.get()
   .getOrElseUpdate(timeZone, {
-val c = Calendar.getInstance(timeZone)
-(c, c.getTimeInMillis)
+Calendar.getInstance(timeZone)
   })
 c.clear()
-c.setTimeInMillis(timeInMillis)
+c.setTimeInMillis(System.currentTimeMillis())
--- End diff --

oh, Calendar.getTimeInMillis and setTimeInMillis are also UTC-based.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193688565
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
+  override def initialValue(): mutable.Map[TimeZone, Calendar] = {
+mutable.Map[TimeZone, Calendar]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val c = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+Calendar.getInstance(timeZone)
+  })
+c.clear()
--- End diff --

Doesn't `clear` reset the timezone of that `Calendar` instance?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21505
  
We would appreciate it if you put the performance before and after this PR?
It would be good to use `Benchmark` class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP P...

2018-06-07 Thread DazhuangSu
Github user DazhuangSu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19691#discussion_r193691275
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -510,40 +511,86 @@ case class AlterTableRenamePartitionCommand(
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, 
...] [PURGE];
+ *   ALTER TABLE table DROP [IF EXISTS] PARTITION (spec1, expr1)
+ * [, PARTITION (spec2, expr2), ...] [PURGE];
  * }}}
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitions: Seq[(TablePartitionSpec, Seq[Expression])],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
-  extends RunnableCommand {
+  extends RunnableCommand with PredicateHelper {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
 val table = catalog.getTableMetadata(tableName)
+val resolver = sparkSession.sessionState.conf.resolver
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val toDrop = partitions.flatMap { partition =>
+  if (partition._1.isEmpty && !partition._2.isEmpty) {
+// There are only expressions in this drop condition.
+extractFromPartitionFilter(partition._2, catalog, table, resolver)
+  } else if (!partition._1.isEmpty && partition._2.isEmpty) {
+// There are only partitionSpecs in this drop condition.
+extractFromPartitionSpec(partition._1, table, resolver)
+  } else if (!partition._1.isEmpty && !partition._2.isEmpty) {
+// This drop condition has both partitionSpecs and expressions.
+extractFromPartitionFilter(partition._2, catalog, table, 
resolver).intersect(
--- End diff --

hi, @mgaido91 there is one problem after I changed the syntax,
when i run sql `DROP PARTITION (p >=2)` it throws
`org.apache.spark.sql.AnalysisException: cannot resolve 'p' given input 
columns: []`
I'm trying to find a way to figure it out.

By the way, is a syntax like `((partitionVal (',' partitionVal)*) | 
(expression (',' expression)*))` legal? Because I wrote a antlr4 syntax test, 
but it didn't work as I supposed.

Besides, I was wrong that day. I think the if conditions won't be 
inefficient if there is a lot of partitions. it maybe inefficient if there are 
a lot of dropPartitionSpec which I don't think can happen easily.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread ssonker
Github user ssonker commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193694372
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
+  override def initialValue(): mutable.Map[TimeZone, Calendar] = {
+mutable.Map[TimeZone, Calendar]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val c = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+Calendar.getInstance(timeZone)
+  })
+c.clear()
--- End diff --

Nope. It clears all the ```fields``` and zone is not a field.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21499
  
**[Test build #91520 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91520/testReport)**
 for PR 21499 at commit 
[`7f24206`](https://github.com/apache/spark/commit/7f242064ade10c02733e29a132aec5fc9af9b887).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193696451
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
--- End diff --

Do we need to keep Calendar for many timezone? Since `getCalendar` takes a 
time zone input, we can just keep one Calendar instance, and set it with given 
timezone in `getCalendar`. WDYT? Regarding performance, is there big difference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3831/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/21502#discussion_r193724774
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("SPARK-22575: remove allocated blocks when they are not needed 
anymore") {
+val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value")
+val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+  case b: BroadcastHashJoinExec => b
+}.size
+assert(numBroadCastHashJoin > 0)
+df3.collect()
+df3.destroy()
+val blockManager = sparkContext.env.blockManager
+val blocks = blockManager.getMatchingBlockIds(blockId => {
+  blockId.isBroadcast && 
blockManager.getStatus(blockId).get.storageLevel.deserialized
+}).distinct
+val blockValues = blocks.flatMap { id =>
+  blockManager.getSingle[Any](id)
+}
--- End diff --

Here maybe the root cause for the unstable UT failure and the block can't 
be deleted soon, I added a sleep and can pass every times, you can have a try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21467
  
**[Test build #91521 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91521/testReport)**
 for PR 21467 at commit 
[`9724640`](https://github.com/apache/spark/commit/9724640c534f3f1600ae3c37988479e7d0500cd0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18900: [SPARK-21687][SQL] Spark SQL should set createTim...

2018-06-07 Thread debugger87
Github user debugger87 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18900#discussion_r193730957
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -1019,6 +1021,8 @@ private[hive] object HiveClientImpl {
 compressed = apiPartition.getSd.isCompressed,
 properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
   .map(_.asScala.toMap).orNull),
+  createTime = apiPartition.getCreateTime.toLong * 1000,
+  lastAccessTime = apiPartition.getLastAccessTime.toLong * 1000)
--- End diff --

@cxzl25 yeah, it's my mistake, i will fix it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18900: [SPARK-21687][SQL] Spark SQL should set createTime for H...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18900
  
**[Test build #91522 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91522/testReport)**
 for PR 18900 at commit 
[`b0846c3`](https://github.com/apache/spark/commit/b0846c39a94d729ec0324cc72b98861da7c073c7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91523 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91523/testReport)**
 for PR 21469 at commit 
[`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21467
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91521/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21467
  
**[Test build #91521 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91521/testReport)**
 for PR 21467 at commit 
[`9724640`](https://github.com/apache/spark/commit/9724640c534f3f1600ae3c37988479e7d0500cd0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21467
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/13599
  
**[Test build #91519 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91519/testReport)**
 for PR 13599 at commit 
[`3c02852`](https://github.com/apache/spark/commit/3c0285219b45cb2e5b3b7e21f6b9b0fceb72ac62).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: 
Boolean)`
  * `  class DriverEndpoint(override val rpcEnv: RpcEnv)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/13599
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91519/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/13599
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193740695
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (partition_id, epoch_id))
+... return True
+... d

[GitHub] spark pull request #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21502#discussion_r193742604
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 ---
@@ -152,6 +152,26 @@ class BroadcastJoinSuite extends QueryTest with 
SQLTestUtils {
 }
   }
 
+  test("SPARK-22575: remove allocated blocks when they are not needed 
anymore") {
+val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value")
+val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+val df3 = df1.join(broadcast(df2), Seq("key"), "inner")
+val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
+  case b: BroadcastHashJoinExec => b
+}.size
+assert(numBroadCastHashJoin > 0)
+df3.collect()
+df3.destroy()
+val blockManager = sparkContext.env.blockManager
+val blocks = blockManager.getMatchingBlockIds(blockId => {
+  blockId.isBroadcast && 
blockManager.getStatus(blockId).get.storageLevel.deserialized
+}).distinct
+val blockValues = blocks.flatMap { id =>
+  blockManager.getSingle[Any](id)
+}
--- End diff --

I run the test 1 times and I cannot reproduce the issue locally. Can 
you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193736960
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 ---
@@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
 }
 
 def makeSortMergeJoin(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
-boundCondition: Option[Expression],
-leftPlan: SparkPlan,
-rightPlan: SparkPlan) = {
-  val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, 
Inner, boundCondition,
-leftPlan, rightPlan)
+   leftKeys: Seq[Expression],
+   rightKeys: Seq[Expression],
+   boundCondition: Option[Expression],
+   rangeConditions: Seq[BinaryComparison],
+   leftPlan: SparkPlan,
+   rightPlan: SparkPlan) = {
+  val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, 
Inner, rangeConditions,
+boundCondition, leftPlan, rightPlan)
   EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
 }
 
-test(s"$testName using BroadcastHashJoin (build=left)") {
-  extractJoinParts().foreach { case (_, leftKeys, rightKeys, 
boundCondition, _, _) =>
-withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-  checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, 
rightPlan: SparkPlan) =>
-makeBroadcastHashJoin(
-  leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, 
joins.BuildLeft),
-expectedAnswer.map(Row.fromTuple),
-sortAnswers = true)
+val configOptions = List(
+  ("spark.sql.codegen.wholeStage", "true"),
+  ("spark.sql.codegen.wholeStage", "false"))
+
+// Disabling these because the code would never follow this path in 
case of a inner range join
+if (!expectRangeJoin) {
+  var counter = 1
--- End diff --

If you want to avoid a `var`, just `configOptions.zipWithIndex.foreach { 
case ((config, confValue), counter) =>`. Just a tiny bit more idiomatic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193734550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
+var rangeConditions: Seq[BinaryComparison] =
+  if (SQLConf.get.useSmjInnerRangeOptimization) {
+otherPredicates.flatMap {
+  case p@LessThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); GreaterThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@LessThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); LessThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@GreaterThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); LessThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case _ => None
+}
+  } else {
+Nil
+  }
+
+// Only using secondary join optimization when both lower and 
upper conditions
+// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
+if(rangeConditions.size != 2 ||
+// Looking for one < and one > comparison:
+rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
--- End diff --

Instead of checking `.size == 0`, something like 
`rangeConditions.forall(... not instance of either ...)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193735061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
--- End diff --

I tend to prefer `val rangePreds = mutable.Set.empty[Expression]` as it's 
shorter, but that's just taste


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193735681
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
+var rangeConditions: Seq[BinaryComparison] =
+  if (SQLConf.get.useSmjInnerRangeOptimization) {
+otherPredicates.flatMap {
+  case p@LessThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); GreaterThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@LessThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); LessThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@GreaterThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); LessThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case _ => None
+}
+  } else {
+Nil
+  }
+
+// Only using secondary join optimization when both lower and 
upper conditions
+// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
+if(rangeConditions.size != 2 ||
+// Looking for one < and one > comparison:
+rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
+  x.isInstanceOf[LessThanOrEqual]).size == 0 ||
+rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
+  x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
+// Check if both comparisons reference the same columns:
+rangeConditions.flatMap(c => 
c.left.references.toSeq.distinct).distinct.size != 1 ||
+rangeConditions.flatMap(c => 
c.right.references.toSeq.distinct).distinct.size != 1) {
+  logDebug("Inner range optimization conditions not met. Clearing 
range conditions")
+  rangeConditions = Nil
+  rangePreds.clear()
+}
+
+Some((joinType, leftKeys, rightKeys, rangeConditions,
+  
otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, 
right))
   } else {
 None
   }
 case _ => None
   }
+
+  /**
+   * Checks if l and r are valid range conditions:
+   *   - l and r expressions should both contain a single reference to one 
and the same column.
+   *   - the referenced column should not be part of joinKeys
+   * If these conditions are not met, the function returns None.
+   *
+   * Otherwise, the function checks if the left plan contains l expression 
and the right plan
+   * contains r expression. If the expressions need to be switched, the 
function returns Some(true)
+   * and Some(false) otherwise.
+   */
+  private def checkRangeConditions(l : Expression, r : Expression,
+  left : LogicalPlan, right : LogicalPlan,
+  joinKeys : Seq[(Expression, Expression)]) = {
+val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
+if(lattrs.size != 1 || rattrs.size != 1) {
+  None
+}
+else if (canEvaluate(l, left) && canEvaluate(r, right)) {
--- End diff --

Nit: pull else onto previous line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193733146
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
+var rangeConditions: Seq[BinaryComparison] =
+  if (SQLConf.get.useSmjInnerRangeOptimization) {
+otherPredicates.flatMap {
+  case p@LessThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); GreaterThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@LessThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); LessThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@GreaterThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); LessThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case _ => None
+}
+  } else {
+Nil
+  }
+
+// Only using secondary join optimization when both lower and 
upper conditions
+// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
+if(rangeConditions.size != 2 ||
--- End diff --

Nit: space after "if" here and elsewhere


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193743605
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1205,6 +1205,19 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val USE_SMJ_INNER_RANGE_OPTIMIZATION =
--- End diff --

Yes, at best make this internal. Are there conditions where you would not 
want to apply this? is it just a safety valve?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193736438
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 ---
@@ -70,27 +70,41 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
 (3, 2)
   ).toDF("a", "b")
 
+  private lazy val rangeTestData1 = Seq(
+(1, 3), (1, 4), (1, 7), (1, 8), (1, 10),
+(2, 1), (2, 2), (2, 3), (2, 8),
+(3, 1), (3, 2), (3, 3), (3, 5),
+(4, 1), (4, 2), (4, 3)
+  ).toDF("a", "b")
+
+  private lazy val rangeTestData2 = Seq(
+(1, 1), (1, 2), (1, 2), (1, 3), (1, 5), (1, 7), (1, 20),
+(2, 1), (2, 2), (2, 3), (2, 5), (2, 6),
+(3, 3), (3, 6)
+  ).toDF("a", "b")
+
   // Note: the input dataframes and expression must be evaluated lazily 
because
   // the SQLContext should be used only within a test to keep SQL tests 
stable
   private def testInnerJoin(
-  testName: String,
-  leftRows: => DataFrame,
-  rightRows: => DataFrame,
-  condition: () => Expression,
-  expectedAnswer: Seq[Product]): Unit = {
+ testName: String,
+ leftRows: => DataFrame,
+ rightRows: => DataFrame,
+ condition: () => Expression,
+ expectedAnswer: Seq[Product],
+ expectRangeJoin: Boolean = false): Unit = {
 
 def extractJoinParts(): Option[ExtractEquiJoinKeys.ReturnType] = {
   val join = Join(leftRows.logicalPlan, rightRows.logicalPlan, Inner, 
Some(condition()))
   ExtractEquiJoinKeys.unapply(join)
 }
 
 def makeBroadcastHashJoin(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
-boundCondition: Option[Expression],
-leftPlan: SparkPlan,
-rightPlan: SparkPlan,
-side: BuildSide) = {
+   leftKeys: Seq[Expression],
--- End diff --

(Undo this whitespace change and the next one)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193737191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in 
an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will 
switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before 
if there is
+ * excessive memory consumption). Setting these threshold involves 
following trade-offs:
+ *
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory 
array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled 
frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression 
compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class InMemoryUnsafeRowQueue(
--- End diff --

No way to avoid making a custom queue implementation here? is it messier 
without such a structure?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193735968
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
+var rangeConditions: Seq[BinaryComparison] =
+  if (SQLConf.get.useSmjInnerRangeOptimization) {
+otherPredicates.flatMap {
+  case p@LessThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); GreaterThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@LessThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
+case true => rangePreds.add(p); LessThan(r, l)
+case false => rangePreds.add(p); p
+  }
+  case p@GreaterThanOrEqual(l, r) =>
+checkRangeConditions(l, r, left, right, joinKeys).map {
+  case true => rangePreds.add(p); LessThanOrEqual(r, l)
+  case false => rangePreds.add(p); p
+}
+  case _ => None
+}
+  } else {
+Nil
+  }
+
+// Only using secondary join optimization when both lower and 
upper conditions
+// are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
+if(rangeConditions.size != 2 ||
+// Looking for one < and one > comparison:
+rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
+  x.isInstanceOf[LessThanOrEqual]).size == 0 ||
+rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
+  x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
+// Check if both comparisons reference the same columns:
+rangeConditions.flatMap(c => 
c.left.references.toSeq.distinct).distinct.size != 1 ||
+rangeConditions.flatMap(c => 
c.right.references.toSeq.distinct).distinct.size != 1) {
+  logDebug("Inner range optimization conditions not met. Clearing 
range conditions")
+  rangeConditions = Nil
+  rangePreds.clear()
+}
+
+Some((joinType, leftKeys, rightKeys, rangeConditions,
+  
otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, 
right))
   } else {
 None
   }
 case _ => None
   }
+
+  /**
+   * Checks if l and r are valid range conditions:
+   *   - l and r expressions should both contain a single reference to one 
and the same column.
+   *   - the referenced column should not be part of joinKeys
+   * If these conditions are not met, the function returns None.
+   *
+   * Otherwise, the function checks if the left plan contains l expression 
and the right plan
+   * contains r expression. If the expressions need to be switched, the 
function returns Some(true)
+   * and Some(false) otherwise.
+   */
+  private def checkRangeConditions(l : Expression, r : Expression,
+  left : LogicalPlan, right : LogicalPlan,
+  joinKeys : Seq[(Expression, Expression)]) = {
--- End diff --

For clarity add a return type to this method. `Option[Boolean]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread zecevicp
Github user zecevicp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193751918
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1205,6 +1205,19 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val USE_SMJ_INNER_RANGE_OPTIMIZATION =
--- End diff --

It's just a safety valve. In case there are some queries that I don't 
foresee now where this could get in the way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread zecevicp
Github user zecevicp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193753823
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in 
an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will 
switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before 
if there is
+ * excessive memory consumption). Setting these threshold involves 
following trade-offs:
+ *
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory 
array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled 
frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression 
compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class InMemoryUnsafeRowQueue(
--- End diff --

A queue is needed here because it's a moving window instead of a fixed 
block of rows. Maybe I missed an existing class that could do this easily so 
I'll take another look. But, I believe any alternative would indeed be messier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread zecevicp
Github user zecevicp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193753965
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 ---
@@ -117,101 +131,170 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
 }
 
 def makeSortMergeJoin(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
-boundCondition: Option[Expression],
-leftPlan: SparkPlan,
-rightPlan: SparkPlan) = {
-  val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, 
Inner, boundCondition,
-leftPlan, rightPlan)
+   leftKeys: Seq[Expression],
+   rightKeys: Seq[Expression],
+   boundCondition: Option[Expression],
+   rangeConditions: Seq[BinaryComparison],
+   leftPlan: SparkPlan,
+   rightPlan: SparkPlan) = {
+  val sortMergeJoin = joins.SortMergeJoinExec(leftKeys, rightKeys, 
Inner, rangeConditions,
+boundCondition, leftPlan, rightPlan)
   EnsureRequirements(spark.sessionState.conf).apply(sortMergeJoin)
 }
 
-test(s"$testName using BroadcastHashJoin (build=left)") {
-  extractJoinParts().foreach { case (_, leftKeys, rightKeys, 
boundCondition, _, _) =>
-withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
-  checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, 
rightPlan: SparkPlan) =>
-makeBroadcastHashJoin(
-  leftKeys, rightKeys, boundCondition, leftPlan, rightPlan, 
joins.BuildLeft),
-expectedAnswer.map(Row.fromTuple),
-sortAnswers = true)
+val configOptions = List(
+  ("spark.sql.codegen.wholeStage", "true"),
+  ("spark.sql.codegen.wholeStage", "false"))
+
+// Disabling these because the code would never follow this path in 
case of a inner range join
+if (!expectRangeJoin) {
+  var counter = 1
--- End diff --

OK, will do that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread zecevicp
Github user zecevicp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193754271
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 
   if (joinKeys.nonEmpty) {
 val (leftKeys, rightKeys) = joinKeys.unzip
-logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
-Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
+// Find any simple range expressions between two columns
+// (and involving only those two columns) of the two tables being 
joined,
+// which are not used in the equijoin expressions,
+// and which can be used for secondary sort optimizations.
+// rangePreds will contain the original expressions to be filtered 
out later.
+val rangePreds: mutable.Set[Expression] = mutable.Set.empty
--- End diff --

I think you're right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21499
  
**[Test build #91520 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91520/testReport)**
 for PR 21499 at commit 
[`7f24206`](https://github.com/apache/spark/commit/7f242064ade10c02733e29a132aec5fc9af9b887).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91520/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21499: [SPARK-24468][SQL] Handle negative scale when adjusting ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21499
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-06-07 Thread DylanGuedes
Github user DylanGuedes commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r193759057
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2394,6 +2394,23 @@ def array_repeat(col, count):
 return Column(sc._jvm.functions.array_repeat(_to_java_column(col), 
count))
 
 
+@since(2.4)
+def zip(*cols):
+"""
+Collection function: Merge two columns into one, such that the M-th 
element of the N-th
+argument will be the N-th field of the M-th output element.
+
+:param cols: columns in input
+
+>>> from pyspark.sql.functions import zip as spark_zip
--- End diff --

I think that we should stick with something related to zip (such as 
"zip_arrays" or "zip_lists") for "compatibility naming" with other 
APIs/languages (`Enum.zip` in Elixir and `zip` in Scala, for instance).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21502
  
**[Test build #91524 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91524/testReport)**
 for PR 21502 at commit 
[`789168e`](https://github.com/apache/spark/commit/789168e147615c50cfd67ba959ba1d43afb00ccf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91524/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21502
  
**[Test build #91524 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91524/testReport)**
 for PR 21502 at commit 
[`789168e`](https://github.com/apache/spark/commit/789168e147615c50cfd67ba959ba1d43afb00ccf).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193762830
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in 
an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will 
switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before 
if there is
+ * excessive memory consumption). Setting these threshold involves 
following trade-offs:
+ *
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory 
array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled 
frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression 
compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class InMemoryUnsafeRowQueue(
+taskMemoryManager: TaskMemoryManager,
+blockManager: BlockManager,
+serializerManager: SerializerManager,
+taskContext: TaskContext,
+initialSize: Int,
+pageSizeBytes: Long,
+numRowsInMemoryBufferThreshold: Int,
+numRowsSpillThreshold: Int)
+  extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager,
+  blockManager,
+  serializerManager,
+  taskContext,
+  initialSize,
+  pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold) {
+
+  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: 
Int) {
+this(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  1024,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold)
+  }
+
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, 
numRowsInMemoryBufferThreshold)
+
+  private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) {
+new mutable.Queue[UnsafeRow]()
+  } else {
+null
+  }
+
+//  private var spillableArray: UnsafeExternalSorter = _
--- End diff --

nit: Is this comment necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread zecevicp
Github user zecevicp commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193763364
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in 
an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will 
switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before 
if there is
+ * excessive memory consumption). Setting these threshold involves 
following trade-offs:
+ *
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory 
array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled 
frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression 
compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class InMemoryUnsafeRowQueue(
+taskMemoryManager: TaskMemoryManager,
+blockManager: BlockManager,
+serializerManager: SerializerManager,
+taskContext: TaskContext,
+initialSize: Int,
+pageSizeBytes: Long,
+numRowsInMemoryBufferThreshold: Int,
+numRowsSpillThreshold: Int)
+  extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager,
+  blockManager,
+  serializerManager,
+  taskContext,
+  initialSize,
+  pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold) {
+
+  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: 
Int) {
+this(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  1024,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold)
+  }
+
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, 
numRowsInMemoryBufferThreshold)
+
+  private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) {
+new mutable.Queue[UnsafeRow]()
+  } else {
+null
+  }
+
+//  private var spillableArray: UnsafeExternalSorter = _
--- End diff --

No, it's not. Thank you


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21502: [SPARK-22575][SQL] Add destroy to Dataset

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21502
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3832/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-07 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21506

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which 
communicate with file system (mostly remote HDFS in production) in 
HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21506


commit d84f98fc978262f4165f78b3b223b8bb3151f735
Author: Jungtaek Lim 
Date:   2018-06-07T14:14:46Z

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21506
  
**[Test build #91525 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91525/testReport)**
 for PR 21506 at commit 
[`d84f98f`](https://github.com/apache/spark/commit/d84f98fc978262f4165f78b3b223b8bb3151f735).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
There're plenty of other debug messages which might hide the log messages 
added from this patch. Would we want to log them with INFO instead of DEBUG?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91523 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91523/testReport)**
 for PR 21469 at commit 
[`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21469
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91523/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21506
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21469
  
**[Test build #91526 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91526/testReport)**
 for PR 21469 at commit 
[`3c80cad`](https://github.com/apache/spark/commit/3c80cad32c056a24a7f5ffd7ab0ae3f7e096a62d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21501: [SPARK-15064][ML] Locale support in StopWordsRemo...

2018-06-07 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21501#discussion_r193779131
  
--- Diff: python/pyspark/ml/feature.py ---
@@ -2582,25 +2582,27 @@ class StopWordsRemover(JavaTransformer, 
HasInputCol, HasOutputCol, JavaMLReadabl
   typeConverter=TypeConverters.toListString)
 caseSensitive = Param(Params._dummy(), "caseSensitive", "whether to do 
a case sensitive " +
   "comparison over the stop words", 
typeConverter=TypeConverters.toBoolean)
+locale = Param(Params._dummy(), "locale", "locale of the input. 
ignored when case sensitive is false",
--- End diff --

`false` -> `true`. (Copy the param doc from Scala)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21501: [SPARK-15064][ML] Locale support in StopWordsRemo...

2018-06-07 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21501#discussion_r193777474
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StopWordsRemover.scala ---
@@ -84,7 +86,31 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") 
override val uid: String
   @Since("1.5.0")
   def getCaseSensitive: Boolean = $(caseSensitive)
 
-  setDefault(stopWords -> 
StopWordsRemover.loadDefaultStopWords("english"), caseSensitive -> false)
+  /**
+   * [[https://docs.oracle.com/javase/8/docs/api/java/util/Locale.html 
Locale]] of the input for case insensitive
+   * matching. Ignored when [[caseSensitive]] is true.
+   * Default: Locale.getDefault.toString
+   * @see `StopWordsRemover.loadDefaultStopWords()`
+   * @group param
+   */
+  @Since("2.4.0")
+  val locale: Param[String] = new Param[String](this, "locale",
+"Locale of the input for case insensitive matching. Ignored when 
caseSensitive is false.",
--- End diff --

`false` -> `true`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >