[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-07-06 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r126029620
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("recover from a Spark v2.1 checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def prepareMemoryStream(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+prepareMemoryStream()
+withTempDir { dir =>
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
--- End diff --

https://github.com/apache/spark/pull/18503#discussion_r126028838


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18503: [SPARK-21271][SQL] Ensure Unsafe.sizeInBytes is a...

2017-07-06 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/18503#discussion_r126028838
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -350,20 +350,24 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
   throw new IOException(
 s"Error reading delta file $fileToRead of $this: key size 
cannot be $keySize")
 } else {
-  val keyRowBuffer = new Array[Byte](keySize)
+  // If key size in an existing file is not a multiple of 8, round 
it to multiple of 8
+  val keyAllocationSize = ((keySize + 7) / 8) * 8
+  val keyRowBuffer = new Array[Byte](keyAllocationSize)
--- End diff --

Regen would involve running a query that looks something like:

- set # shuffle partitions to 10
- add 1,2,3,4
- processAllAvail
- add 3,4,5,6
- processAllAvail
- stop

The checkpoint generated by this query should do it. This should be 
generated with Spark 2.1 branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113831182
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -825,6 +832,11 @@ class StreamExecution(
 }
   }
 
+  private def getBatchDescriptionString: String = {
+val batchDescription = if (currentBatchId < 0) "init" else 
currentBatchId.toString
+Option(name).map(_ + "").getOrElse("") +
+  s"id = $idrunId = $runIdbatch = $batchDescription"
--- End diff --

@marmbrus @zsxwing @tdas Updated as per comments, the screenshots are in 
the PR description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113830998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -825,6 +832,11 @@ class StreamExecution(
 }
   }
 
+  private def getBatchDescriptionString: String = {
+val batchDescription = if (currentBatchId < 0) "init" else 
currentBatchId.toString
+Option(name).map(_ + " ").getOrElse("") +
+  s"[batch = $batchDescription,id = $id,runId = $runId]"
--- End diff --

Yes, updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113825863
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -825,6 +832,11 @@ class StreamExecution(
 }
   }
 
+  private def getBatchDescriptionString: String = {
+val batchDescription = if (currentBatchId < 0) "init" else 
currentBatchId.toString
+Option(name).map(_ + " ").getOrElse("") +
+  s"[batch = $batchDescription,id = $id,runId = $runId]"
--- End diff --

@marmbrus @zsxwing @tdas Is this format good for description?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113825791
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -308,6 +311,7 @@ class StreamExecution(
   logDebug(s"batch ${currentBatchId} committed")
   // We'll increase currentBatchId after we complete 
processing current batch's data
   currentBatchId += 1
+  
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
--- End diff --

Update job description with updated `currentBatchId` after each batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113825741
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -289,6 +291,7 @@ class StreamExecution(
   if (currentBatchId < 0) {
 // We'll do this initialization only once
 populateStartOffsets(sparkSessionToRunBatches)
+
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
--- End diff --

Update job description with correct currentBatchId after initializing 
starting offsets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113825644
  
--- Diff: core/src/main/scala/org/apache/spark/ui/UIUtils.scala ---
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
   val xml = XML.loadString(s"""$desc""")
 
   // Verify that this has only anchors and span (we are wrapping in 
span)
-  val allowedNodeLabels = Set("a", "span")
+  val allowedNodeLabels = Set("a", "span", "br")
--- End diff --

@tdas Is this change okay? Need it to add line breaks in the job 
description cells.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and description...

2017-04-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17765#discussion_r113817633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -252,6 +252,7 @@ class StreamExecution(
*/
   private def runBatches(): Unit = {
 try {
+  sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString)
--- End diff --

Had this set to false due to HDFS-1208, but setting it to true since the 
HDFS bug is 7 years old.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17765: [SPARK-20464][SS] Add a job group and an informat...

2017-04-25 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17765

[SPARK-20464][SS] Add a job group and an informative description for 
streaming queries

## What changes were proposed in this pull request?

This change makes it convenient to group the batches of a query by its name 
in the Spark Jobs UI.

## How was this patch tested?

![screen shot 2017-04-25 at 3 25 43 
pm](https://cloud.githubusercontent.com/assets/7865120/25410424/8bbf3830-29cb-11e7-9602-6bb919c54f0c.png)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark sc-6696

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17765


commit 07e182ba36fa0499a8da1a2b480030b6785f78a5
Author: Kunal Khamar <kkha...@outlook.com>
Date:   2017-04-25T22:23:10Z

Set job group for streaming query batches.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17761: [SPARK-20461][Core][SS]Use UninterruptibleThread ...

2017-04-25 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17761#discussion_r113274024
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -62,11 +63,20 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 
   case class AvailableOffsetRange(earliest: Long, latest: Long)
 
+  private def runUninterruptiblyIfPossiable[T](body: => T): T = 
Thread.currentThread match {
--- End diff --

nit: rename `runUninterruptiblyIfPossible`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17486: [SPARK-20164][SQL] AnalysisException not tolerant of nul...

2017-03-31 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17486
  
@gatorsmile Verified the behaviour using this, it makes `plan` null upon 
deserialization.
```
import java.io._
import org.apache.spark.sql.AnalysisException

lazy val exception = new AnalysisException("", None, None, plan = None)
// Serialize exception
lazy val bo = new ByteArrayOutputStream()
lazy val o = new ObjectOutputStream(bo)
o.writeObject(exception)
lazy val bytes = bo.toByteArray

// Deserialize ex
lazy val bi = new ByteArrayInputStream(bytes)
lazy val i = new ObjectInputStream(bi)
lazy val deserialized = i.readObject.asInstanceOf[AnalysisException]

println(deserialized.plan)
```

Not sure what to add and where in the scala-style-guide?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17486: [SPARK-20164][SQL] AnalysisException not tolerant...

2017-03-30 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17486#discussion_r109081024
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala ---
@@ -43,7 +43,7 @@ class AnalysisException protected[sql] (
   }
 
   override def getMessage: String = {
-val planAnnotation = plan.map(p => s";\n$p").getOrElse("")
+val planAnnotation = Option(plan).map(p => s";\n$p").getOrElse("")
--- End diff --

Right, updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17486: [SPARK-20164][SQL] AnalysisException not tolerant of nul...

2017-03-30 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17486
  
@gatorsmile Looks like its happening because `plan` is marked `transient`, 
when AnalysisException is serialized and then deserialized, `plan` may turn up 
null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17486: [SPARK-20164][SQL] AnalysisException not tolerant...

2017-03-30 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17486#discussion_r109057059
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala ---
@@ -43,7 +43,7 @@ class AnalysisException protected[sql] (
   }
 
   override def getMessage: String = {
-val planAnnotation = plan.map(p => s";\n$p").getOrElse("")
+val planAnnotation = if (plan == null) "" else plan.map(p => 
s";\n$p").getOrElse("")
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17486: [SPARK-20164][SQL][WIP] AnalysisException not tol...

2017-03-30 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17486

[SPARK-20164][SQL][WIP] AnalysisException not tolerant of null query plan.

## What changes were proposed in this pull request?

When someone throws an AnalysisException with a null query plan (which 
ideally no one should), getMessage is not tolerant of this and throws a null 
pointer exception, leading to loss of information about original exception.
Fix is to add a null check in getMessage.

## How was this patch tested?

- Unit test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark spark-20164

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17486


commit e3c1970a3a715e8e8042384bb81bcc86730d6d81
Author: Kunal Khamar <kkha...@outlook.com>
Date:   2017-03-30T21:11:58Z

Add null check to AnalysisException.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17379: [SPARK-20048][SQL] Cloning SessionState does not clone q...

2017-03-29 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17379
  
@hvanhovell it should be HiveSessionStateBuilder now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-29 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108748031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 ---
@@ -134,6 +135,14 @@ abstract class BaseSessionStateBuilder(
   }
 
   /**
+   * Interface exposed to the user for registering user-defined functions.
+   *
+   * Note 1: The user-defined functions must be deterministic.
+   * Note 2: This depends on the `functionRegistry` field.
+   */
+  protected def udfRegistration: UDFRegistration = new 
UDFRegistration(functionRegistry)
--- End diff --

It was the only thing not initialized in the builder, thought it would be 
more consistent to have all initialization in the builder. Is this okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17379: [SPARK-20048][SQL] Cloning SessionState does not clone q...

2017-03-29 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17379
  
cc @hvanhovell 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108586704
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -32,15 +32,15 @@ import 
org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLo
  */
 private[hive] object HiveSessionState {
   /**
-   * Create a new Hive aware [[SessionState]]. for the given session.
+   * Create a new Hive aware [[SessionState]] for the given session.
*/
   def apply(session: SparkSession): SessionState = {
 new HiveSessionStateBuilder(session).build()
   }
 }
 
 /**
- * Builder that produces a [[HiveSessionState]].
+ * Builder that produces a Hive aware [[SessionState]].
  */
 @Experimental
 @InterfaceStability.Unstable
--- End diff --

Renamed, removed `object HiveSessionState`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108586634
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/sessionStateBuilders.scala
 ---
@@ -134,6 +135,14 @@ abstract class BaseSessionStateBuilder(
   }
 
   /**
+   * Interface exposed to the user for registering user-defined functions.
+   *
+   * Note 1: The user-defined functions must be deterministic.
+   * Note 2: This depends on the `functionRegistry` field.
+   */
+  protected def udf: UDFRegistration = new 
UDFRegistration(functionRegistry)
--- End diff --

changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108586629
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -37,38 +37,42 @@ import 
org.apache.spark.sql.util.ExecutionListenerManager
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
  *
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
+ * @param sharedState The state shared across sessions, e.g. global view 
manager, external catalog.
  * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
+ * @param experimentalMethods Interface to add custom planning strategies 
and optimizers.
  * @param functionRegistry Internal catalog for managing functions 
registered by the user.
+ * @param udf Interface exposed to the user for registering user-defined 
functions.
  * @param catalog Internal catalog for managing table and database states.
  * @param sqlParser Parser that extracts expressions, plans, table 
identifiers etc. from SQL texts.
  * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
  * @param optimizer Logical query plan optimizer.
  * @param planner Planner that converts optimized logical plans to 
physical plans
  * @param streamingQueryManager Interface to start and stop streaming 
queries.
+ * @param listenerManager Interface to register custom
+ *
[[org.apache.spark.sql.util.QueryExecutionListener]]s
+ * @param resourceLoader Session shared resource loader to load JARs, 
files, etc
  * @param createQueryExecution Function used to create QueryExecution 
objects.
  * @param createClone Function used to create clones of the session state.
  */
 private[sql] class SessionState(
-sparkContext: SparkContext,
 sharedState: SharedState,
 val conf: SQLConf,
 val experimentalMethods: ExperimentalMethods,
 val functionRegistry: FunctionRegistry,
+val udf: UDFRegistration,
--- End diff --

changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108532483
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -38,10 +38,7 @@ import org.apache.spark.sql.util.ExecutionListenerManager
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
  * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
--- End diff --

Added it back, hopefully more descriptive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108532136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -134,17 +131,20 @@ private[sql] class SessionState(
 
 SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
 
-new SessionState(
-  sparkContext,
-  newSparkSession.sharedState,
-  confCopy,
-  experimentalMethods.clone(),
-  functionRegistryCopy,
-  catalogCopy,
-  sqlParser,
-  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
-  new StreamingQueryManager(newSparkSession),
-  queryExecutionCreator)
+val newSessionState = new SessionState(
+sparkContext,
+newSparkSession.sharedState,
+confCopy,
+experimentalMethods.clone(),
+functionRegistryCopy,
+catalogCopy,
+sqlParser,
+SessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+new StreamingQueryManager(newSparkSession),
+queryExecutionCreator) {
+  override val listenerManager: ExecutionListenerManager = 
self.listenerManager.clone()
--- End diff --

Builders were added in [https://github.com/apache/spark/pull/17433](url). 
Changed initialization to use that pattern, so it is a constructor param now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-28 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r108531631
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -32,19 +32,9 @@ import 
org.apache.spark.sql.streaming.StreamingQueryManager
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
- * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
- * @param functionRegistry Internal catalog for managing functions 
registered by the user.
  * @param catalog Internal catalog for managing table and database states 
that uses Hive client for
  *interacting with the metastore.
- * @param sqlParser Parser that extracts expressions, plans, table 
identifiers etc. from SQL texts.
  * @param metadataHive The Hive metadata client.
- * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- *  
[[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from 
a [[LogicalPlan]]
--- End diff --

`HiveSessionState` is gone, so this is not relevant anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108289611
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---
@@ -192,36 +196,21 @@ private[hive] class TestHiveSparkSession(
 
   @transient
   override lazy val sessionState: HiveSessionState = {
-val testConf =
-  new SQLConf {
-clear()
-override def caseSensitiveAnalysis: Boolean = 
getConf(SQLConf.CASE_SENSITIVE, false)
-override def clear(): Unit = {
-  super.clear()
-  TestHiveContext.overrideConfs.foreach { case (k, v) => 
setConfString(k, v) }
-}
-  }
-val queryExecutionCreator = (plan: LogicalPlan) => new 
TestHiveQueryExecution(this, plan)
-val initHelper = HiveSessionState(this, testConf)
-SessionState.mergeSparkConf(testConf, sparkContext.getConf)
-
-new HiveSessionState(
-  sparkContext,
-  sharedState,
-  testConf,
-  initHelper.experimentalMethods,
-  initHelper.functionRegistry,
-  initHelper.catalog,
-  initHelper.sqlParser,
-  initHelper.metadataHive,
-  initHelper.analyzer,
-  initHelper.streamingQueryManager,
-  queryExecutionCreator,
-  initHelper.plannerCreator)
+new TestHiveSessionStateBuilder(this, parentSessionState).build
--- End diff --

`.build()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108293545
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -18,20 +18,23 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, 
SparkSqlParser}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal._
--- End diff --

`import org.apache.spark.sql.internal.{BaseSessionStateBuilder, 
SessionFunctionResourceLoader, SessionState, SharedState, SQLConf}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108293251
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala ---
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal._
--- End diff --

`import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, 
SQLConf, WithTestConf}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108282976
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -121,150 +124,115 @@ private[hive] class HiveSessionState(
   def hiveThriftServerAsync: Boolean = {
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
+}
 
+private[hive] object HiveSessionState {
   /**
-   * Get an identical copy of the `HiveSessionState`.
-   * This should ideally reuse the `SessionState.clone` but cannot do so.
-   * Doing that will throw an exception when trying to clone the catalog.
+   * Create a new [[HiveSessionState]] for the given session.
*/
-  override def clone(newSparkSession: SparkSession): HiveSessionState = {
-val sparkContext = newSparkSession.sparkContext
-val confCopy = conf.clone()
-val functionRegistryCopy = functionRegistry.clone()
-val experimentalMethodsCopy = experimentalMethods.clone()
-val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
-val catalogCopy = catalog.newSessionCatalogWith(
-  newSparkSession,
-  confCopy,
-  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
-  functionRegistryCopy,
-  sqlParser)
-val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
-
-val hiveClient =
-  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-.newSession()
-
-SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
-new HiveSessionState(
-  sparkContext,
-  newSparkSession.sharedState,
-  confCopy,
-  experimentalMethodsCopy,
-  functionRegistryCopy,
-  catalogCopy,
-  sqlParser,
-  hiveClient,
-  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
-  new StreamingQueryManager(newSparkSession),
-  queryExecutionCreator,
-  HiveSessionState.createPlannerCreator(
-newSparkSession,
-confCopy,
-experimentalMethodsCopy))
+  def apply(session: SparkSession): SessionState = {
+new HiveSessionStateBuilder(session).build()
   }
-
 }
 
-private[hive] object HiveSessionState {
-
-  def apply(sparkSession: SparkSession): HiveSessionState = {
-apply(sparkSession, new SQLConf)
-  }
-
-  def apply(sparkSession: SparkSession, conf: SQLConf): HiveSessionState = 
{
-val initHelper = SessionState(sparkSession, conf)
-
-val sparkContext = sparkSession.sparkContext
-
-val catalog = HiveSessionCatalog(
-  sparkSession,
-  initHelper.functionRegistry,
-  initHelper.conf,
-  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
-  initHelper.sqlParser)
-
-val metadataHive: HiveClient =
-  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-.newSession()
-
-val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+/**
+ * Builder that produces a [[HiveSessionState]].
+ */
+@Experimental
+@InterfaceStability.Unstable
+class HiveSessionStateBuilder(session: SparkSession, parentState: 
Option[SessionState] = None)
+  extends BaseSessionStateBuilder(session, parentState) {
 
-val plannerCreator = createPlannerCreator(
-  sparkSession,
-  initHelper.conf,
-  initHelper.experimentalMethods)
+  private def externalCatalog: HiveExternalCatalog =
+session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
 
-val hiveSessionState = new HiveSessionState(
-  sparkContext,
-  sparkSession.sharedState,
-  initHelper.conf,
-  initHelper.experimentalMethods,
-  initHelper.functionRegistry,
-  catalog,
-  initHelper.sqlParser,
-  metadataHive,
-  analyzer,
-  initHelper.streamingQueryManager,
-  initHelper.queryExecutionCreator,
-  plannerCreator)
-catalog.functionResourceLoader = 
hiveSessionState.functionResourceLoader
-hiveSessionState
+  /**
+   * Create a [[HiveSessionCatalog]].
+   */
+  override protected lazy val catalog: HiveSessionCatalog = {
+val catalog = new HiveSessionCatalog(
+  externalCatalog,
+  session.sharedState.globalTempViewManager,
+  new HiveMetastoreCatalog(session),
+  functionRegistry,
+  conf,
+  SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, 
conf),
+  sqlParser,
+  new SessionFunctionResourceLoa

[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108265096
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -179,88 +132,295 @@ private[sql] class SessionState(
   }
 }
 
-
 private[sql] object SessionState {
+  /**
+   * Create a new [[SessionState]] for the given session.
+   */
+  def apply(session: SparkSession): SessionState = {
+new SessionStateBuilder(session).build()
+  }
+
+  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val newHadoopConf = new Configuration(hadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
+newHadoopConf
+  }
+}
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session 
state, and creates a session
+ * state when `build` is called. Components should only be initialized 
once. This is not a problem
+ * for most components as they are only used in the `build` function. 
However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are 
defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of 
components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
+ * components (they are documented per dependency), a developer should 
respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, 
`functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is 
called, and not before.
+ */
+@Experimental
+@InterfaceStability.Unstable
+abstract class BaseSessionStateBuilder(
+val session: SparkSession,
+val parentState: Option[SessionState] = None) {
+  type NewBuilder = (SparkSession, Option[SessionState]) => 
BaseSessionStateBuilder
 
-  def apply(sparkSession: SparkSession): SessionState = {
-apply(sparkSession, new SQLConf)
+  /**
+   * Extract entries from `SparkConf` and put them in the `SQLConf`
+   */
+  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): 
Unit = {
+sparkConf.getAll.foreach { case (k, v) =>
+  sqlConf.setConfString(k, v)
+}
   }
 
-  def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
-val sparkContext = sparkSession.sparkContext
+  /**
+   * SQL-specific key-value configurations.
+   *
+   * These either get cloned from a pre-existing instance or newly 
created. The conf is always
+   * merged with its [[SparkConf]].
+   */
+  protected lazy val conf: SQLConf = {
+val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+mergeSparkConf(conf, session.sparkContext.conf)
+conf
+  }
 
-// Automatically extract all entries and put them in our SQLConf
-mergeSparkConf(sqlConf, sparkContext.getConf)
+  /**
+   * Internal catalog managing functions registered by the user.
+   *
+   * This either gets cloned from a pre-existing version or cloned from 
the build-in registry.
--- End diff --

super nit: built-in registry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108275733
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -179,88 +132,295 @@ private[sql] class SessionState(
   }
 }
 
-
 private[sql] object SessionState {
+  /**
+   * Create a new [[SessionState]] for the given session.
+   */
+  def apply(session: SparkSession): SessionState = {
+new SessionStateBuilder(session).build
+  }
+
+  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val newHadoopConf = new Configuration(hadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
+newHadoopConf
+  }
+}
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session 
state, and creates a session
+ * state when `build` is called. Components should only be initialized 
once. This is not a problem
+ * for most components as they are only used in the `build` function. 
However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are 
defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of 
components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
+ * components (they are documented per dependency), a developer should 
respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, 
`functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is 
called, and not before.
+ */
+@Experimental
+@InterfaceStability.Unstable
+abstract class BaseSessionStateBuilder(
--- End diff --

That may be a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108292179
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -121,150 +124,115 @@ private[hive] class HiveSessionState(
   def hiveThriftServerAsync: Boolean = {
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
+}
 
+private[hive] object HiveSessionState {
   /**
-   * Get an identical copy of the `HiveSessionState`.
-   * This should ideally reuse the `SessionState.clone` but cannot do so.
-   * Doing that will throw an exception when trying to clone the catalog.
+   * Create a new [[HiveSessionState]] for the given session.
*/
-  override def clone(newSparkSession: SparkSession): HiveSessionState = {
-val sparkContext = newSparkSession.sparkContext
-val confCopy = conf.clone()
-val functionRegistryCopy = functionRegistry.clone()
-val experimentalMethodsCopy = experimentalMethods.clone()
-val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
-val catalogCopy = catalog.newSessionCatalogWith(
-  newSparkSession,
-  confCopy,
-  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
-  functionRegistryCopy,
-  sqlParser)
-val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
-
-val hiveClient =
-  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-.newSession()
-
-SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
-
-new HiveSessionState(
-  sparkContext,
-  newSparkSession.sharedState,
-  confCopy,
-  experimentalMethodsCopy,
-  functionRegistryCopy,
-  catalogCopy,
-  sqlParser,
-  hiveClient,
-  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
-  new StreamingQueryManager(newSparkSession),
-  queryExecutionCreator,
-  HiveSessionState.createPlannerCreator(
-newSparkSession,
-confCopy,
-experimentalMethodsCopy))
+  def apply(session: SparkSession): SessionState = {
--- End diff --

`: HiveSessionState = {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108288085
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -179,88 +132,295 @@ private[sql] class SessionState(
   }
 }
 
-
 private[sql] object SessionState {
+  /**
+   * Create a new [[SessionState]] for the given session.
+   */
+  def apply(session: SparkSession): SessionState = {
+new SessionStateBuilder(session).build
+  }
+
+  def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val newHadoopConf = new Configuration(hadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
+newHadoopConf
+  }
+}
+
+/**
+ * Builder class that coordinates construction of a new [[SessionState]].
+ *
+ * The builder explicitly defines all components needed by the session 
state, and creates a session
+ * state when `build` is called. Components should only be initialized 
once. This is not a problem
+ * for most components as they are only used in the `build` function. 
However some components
+ * (`conf`, `catalog`, `functionRegistry`, `experimentalMethods` & 
`sqlParser`) are as dependencies
+ * for other components and are shared as a result. These components are 
defined as lazy vals to
+ * make sure the component is created only once.
+ *
+ * A developer can modify the builder by providing custom versions of 
components, or by using the
+ * hooks provided for the analyzer, optimizer & planner. There are some 
dependencies between the
+ * components (they are documented per dependency), a developer should 
respect these when making
+ * modifications in order to prevent initialization problems.
+ *
+ * A parent [[SessionState]] can be used to initialize the new 
[[SessionState]]. The new session
+ * state will clone the parent sessions state's `conf`, 
`functionRegistry`, `experimentalMethods`
+ * and `catalog` fields. Note that the state is cloned when `build` is 
called, and not before.
+ */
+@Experimental
+@InterfaceStability.Unstable
+abstract class BaseSessionStateBuilder(
+val session: SparkSession,
+val parentState: Option[SessionState] = None) {
+  type NewBuilder = (SparkSession, Option[SessionState]) => 
BaseSessionStateBuilder
 
-  def apply(sparkSession: SparkSession): SessionState = {
-apply(sparkSession, new SQLConf)
+  /**
+   * Extract entries from `SparkConf` and put them in the `SQLConf`
+   */
+  protected def mergeSparkConf(sqlConf: SQLConf, sparkConf: SparkConf): 
Unit = {
+sparkConf.getAll.foreach { case (k, v) =>
+  sqlConf.setConfString(k, v)
+}
   }
 
-  def apply(sparkSession: SparkSession, sqlConf: SQLConf): SessionState = {
-val sparkContext = sparkSession.sparkContext
+  /**
+   * SQL-specific key-value configurations.
+   *
+   * These either get cloned from a pre-existing instance or newly 
created. The conf is always
+   * merged with its [[SparkConf]].
+   */
+  protected lazy val conf: SQLConf = {
+val conf = parentState.map(_.conf.clone()).getOrElse(new SQLConf)
+mergeSparkConf(conf, session.sparkContext.conf)
+conf
+  }
 
-// Automatically extract all entries and put them in our SQLConf
-mergeSparkConf(sqlConf, sparkContext.getConf)
+  /**
+   * Internal catalog managing functions registered by the user.
+   *
+   * This either gets cloned from a pre-existing version or cloned from 
the build-in registry.
+   */
+  protected lazy val functionRegistry: FunctionRegistry = {
+
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+  }
 
-val functionRegistry = FunctionRegistry.builtin.clone()
+  /**
+   * Experimental methods that can be used to define custom optimization 
rules and custom planning
+   * strategies.
+   *
+   * This either gets cloned from a pre-existing version or newly created.
+   */
+  protected lazy val experimentalMethods: ExperimentalMethods = {
+parentState.map(_.experimentalMethods.clone()).getOrElse(new 
ExperimentalMethods)
+  }
 
-val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
+  /**
+   * Parser that extracts expressions, plans, table identifiers etc. from 
SQL texts.
+   *
+   * Note: this depends on the `conf` field.
+   */
+  protected lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
 
+  /**
+   * Catalog for managing table and database states. If there is a 
pre-existing catalog, the state
+   * of that catalog (temp tables & current dat

[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108281320
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -40,12 +43,13 @@ import 
org.apache.spark.sql.streaming.StreamingQueryManager
  * @param catalog Internal catalog for managing table and database states 
that uses Hive client for
  *interacting with the metastore.
  * @param sqlParser Parser that extracts expressions, plans, table 
identifiers etc. from SQL texts.
- * @param metadataHive The Hive metadata client.
  * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- *  
[[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from 
a [[LogicalPlan]]
- * @param plannerCreator Lambda to create a planner that takes into 
account Hive-specific strategies
+ * @param optimizer Logical query plan optimizer.
+ * @param planner Planner that converts optimized logical plans to 
physical plans
--- End diff --

nit: the comment used to include "planner takes into account Hive-specific 
strategies", lets add that back for completeness?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17433: [SPARK-20100][SQL] Refactor SessionState initiali...

2017-03-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17433#discussion_r108290677
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala ---
@@ -35,16 +35,9 @@ private[sql] class TestSparkSession(sc: SparkContext) 
extends SparkSession(sc) {
   }
 
   @transient
-  override lazy val sessionState: SessionState = SessionState(
-this,
-new SQLConf {
-  clear()
-  override def clear(): Unit = {
-super.clear()
-// Make sure we start with the default test configs even after 
clear
-TestSQLContext.overrideConfs.foreach { case (key, value) => 
setConfString(key, value) }
-  }
-})
+  override lazy val sessionState: SessionState = {
+new TestSQLSessionStateBuilder(this, None).build
--- End diff --

`.build()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-23 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r107821546
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -134,17 +131,20 @@ private[sql] class SessionState(
 
 SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
 
-new SessionState(
-  sparkContext,
-  newSparkSession.sharedState,
-  confCopy,
-  experimentalMethods.clone(),
-  functionRegistryCopy,
-  catalogCopy,
-  sqlParser,
-  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
-  new StreamingQueryManager(newSparkSession),
-  queryExecutionCreator)
+val newSessionState = new SessionState(
+sparkContext,
+newSparkSession.sharedState,
+confCopy,
+experimentalMethods.clone(),
+functionRegistryCopy,
+catalogCopy,
+sqlParser,
+SessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+new StreamingQueryManager(newSparkSession),
+queryExecutionCreator) {
+  override val listenerManager: ExecutionListenerManager = 
self.listenerManager.clone()
--- End diff --

I think the general rule we followed in the cloneSession PR is that `val`s 
that directly depend on `SparkSession` to be initialized, are to be constructor 
params. We leave the other `val`s in the body of class. 
The advantage here is less duplicated code between `SessionState`, 
`HiveSessionState` and `TestHiveSparkSession`.
This is consistent with that, what would be the advantage of changing it to 
be a param?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-23 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r107820985
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -122,6 +125,59 @@ class SessionStateSuite extends SparkFunSuite
 }
   }
 
+  test("fork new session and inherit listener manager") {
+def createTestListener: (ArrayBuffer[String], QueryExecutionListener) 
= {
--- End diff --

neat, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-23 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r107819804
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -38,10 +38,7 @@ import org.apache.spark.sql.util.ExecutionListenerManager
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
  * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
--- End diff --

These comments are adding little or no value. We should remove or make them 
more detailed, which would you prefer? If the latter, what's a good doc for 
shared state and experimental methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-23 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17379#discussion_r107819998
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -32,19 +32,9 @@ import 
org.apache.spark.sql.streaming.StreamingQueryManager
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
- * @param sparkContext The [[SparkContext]].
- * @param sharedState The shared state.
- * @param conf SQL-specific key-value configurations.
- * @param experimentalMethods The experimental methods.
- * @param functionRegistry Internal catalog for managing functions 
registered by the user.
  * @param catalog Internal catalog for managing table and database states 
that uses Hive client for
  *interacting with the metastore.
- * @param sqlParser Parser that extracts expressions, plans, table 
identifiers etc. from SQL texts.
  * @param metadataHive The Hive metadata client.
- * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
- * @param streamingQueryManager Interface to start and stop
- *  
[[org.apache.spark.sql.streaming.StreamingQuery]]s.
- * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from 
a [[LogicalPlan]]
--- End diff --

Each of these is identical to their `SessionState` counterpart and should 
be inherited by Scaladoc comment inheritance, do we still need them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17382: [SPARK-20051][SS] Fix StreamSuite flaky test - recover f...

2017-03-21 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17382
  
@tdas all tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17379: [SPARK-20048][SQL] Cloning SessionState does not clone q...

2017-03-21 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17379
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17382: [SPARK-20051][SS] Fix StreamSuite flaky test - re...

2017-03-21 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17382

[SPARK-20051][SS] Fix StreamSuite flaky test - recover from v2.1 checkpoint

## What changes were proposed in this pull request?

Test fails with IOException, a delete command fails. Fixing to queue up 
deletion at shutdown using shutdown hooks util.

## How was this patch tested?

- Unit test
  - repeated 300 runs with no failure

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark partition-bugfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17382


commit 8c0fcb1f5406e76908ee045d4530d09e5da1c017
Author: Kunal Khamar <kkha...@outlook.com>
Date:   2017-03-21T23:54:40Z

Fix StreamSuite flaky test recover from checkpoint.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-03-21 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r107283640
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -17,43 +17,70 @@
 
 package org.apache.spark.sql.internal
 
-import java.io.File
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryManager}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
+ * @param functionRegistry Internal catalog for managing functions 
registered by the user.
+ * @param catalog Internal catalog for managing table and database states.
+ * @param sqlParser Parser that extracts expressions, plans, table 
identifiers etc. from SQL texts.
+ * @param analyzer Logical query plan analyzer for resolving unresolved 
attributes and relations.
+ * @param streamingQueryManager Interface to start and stop
+ *  
[[org.apache.spark.sql.streaming.StreamingQuery]]s.
+ * @param queryExecutionCreator Lambda to create a [[QueryExecution]] from 
a [[LogicalPlan]]
--- End diff --

@rxin Removing the redundant comments in 
[SPARK-20048](https://github.com/apache/spark/pull/17379).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17379: [SPARK-20048][SQL] Cloning SessionState does not ...

2017-03-21 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17379

[SPARK-20048][SQL] Cloning SessionState does not clone query execution 
listeners

## What changes were proposed in this pull request?

Bugfix from SPARK-19540.
Cloning SessionState does not clone query execution listeners, so cloned 
session is unable to listen to events on queries.

## How was this patch tested?

- Unit test


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark clone-bugfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17379.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17379


commit ad77fe9ad258eac224f069bbc89294818ee6b549
Author: Kunal Khamar <kkha...@outlook.com>
Date:   2017-03-21T21:16:04Z

Fix cloning of listener manager. Remove redundant comments.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17216
  
@zsxwing Will change cloning of listener manager in a new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17216
  
@uncleGen Not sure what that means, could you please elaborate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106724958
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -29,12 +30,32 @@ class OffsetSeqLogSuite extends SparkFunSuite with 
SharedSQLContext {
   case class StringOffset(override val json: String) extends Offset
 
   test("OffsetSeqMetadata - deserialization") {
-assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
-assert(OffsetSeqMetadata(1, 0) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
-assert(OffsetSeqMetadata(0, 2) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
-assert(
-  OffsetSeqMetadata(1, 2) ===
-
OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+def getConfWith(shufflePartitions: Int): Map[String, String] = {
+  Map(key -> shufflePartitions.toString)
+}
+
+// None set
+assert(OffsetSeqMetadata(0, 0, Map.empty) === 
OffsetSeqMetadata("""{}"""))
+
+// One set
+assert(OffsetSeqMetadata(1, 0, Map.empty) === 
OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+assert(OffsetSeqMetadata(0, 2, Map.empty) === 
OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+  OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+// Two set
+assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+  OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+  OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+// All set
+assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+  
OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": 
{"$key":3}}"""))
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-17 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106724948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -549,9 +581,15 @@ class StreamExecution(
   cd.dataType, cd.timeZoneId)
 }
 
+// Reset confs to disallow change in number of partitions
--- End diff --

Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106285230
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
+  Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
shufflePartitionsSparkSession.toString))
+  } else {
+val metadata = nextOffsets.metadata.get
+val shufflePartitionsToUse = 
metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
+  // For backward compatibility, if # partitions was not 
recorded in the offset log,
+  // then ensure it is not missing. The new value is picked up 
from the conf.
+  logDebug("Number of shuffle partitions from previous run not 
found in checkpoint. "
--- End diff --

Changed to log warning.
Rechecked the semantics, it works as expected and warning only printed at 
time of first upgrade.
Once we restart query from a v2.1 checkpoint and then stop it, any new 
offsets written out will contain num shuffle partitions. Any future restarts 
will read these new offsets in 
`StreamExecution.populateStartOffsets->offsetLog.getLatest` and pick up the 
recorded num shuffle partitions.
Useful to note for future reference that we do not change the old offset 
files to contain num shuffle partitions, the semantics are correct because of 
call to `offsetLog.getLatest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
--- End diff --

I see, removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106266808
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +387,27 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+// initialize metadata
+val shufflePartitionsSparkSession: Int = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = {
+  if (nextOffsets.metadata.isEmpty) {
+OffsetSeqMetadata(0, 0,
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269742
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269427
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106269722
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106266038
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compatibility - recover from a Spark v2.1 
checkpoint") {
+var inputData: MemoryStream[Int] = null
+var query: DataStreamWriter[Row] = null
+
+def init(): Unit = {
+  inputData = MemoryStream[Int]
+  inputData.addData(1, 2, 3, 4)
+  inputData.addData(3, 4, 5, 6)
+  inputData.addData(5, 6, 7, 8)
+
+  query = inputData
+.toDF()
+.groupBy($"value")
+.agg(count("*"))
+.writeStream
+.outputMode("complete")
+.format("memory")
+}
+
+// Get an existing checkpoint generated by Spark v2.1.
+// v2.1 does not record # shuffle partitions in the offset metadata.
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri)
+
+// 1 - Test if recovery from the checkpoint is successful.
+init()
+withTempDir(dir => {
+  // Copy the checkpoint to a temp dir to prevent changes to the 
original.
+  // Not doing this will lead to the test passing on the first run, 
but fail subsequent runs.
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Checkpoint data was generated by a query with 10 shuffle 
partitions.
+  // In order to test reading from the checkpoint, the checkpoint must 
have two or more batches,
+  // since the last batch may be rerun.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+var streamingQuery: StreamingQuery = null
+try {
+  streamingQuery =
+query.queryName("counts").option("checkpointLocation", 
dir.getCanonicalPath).start()
+  streamingQuery.processAllAvailable()
+  inputData.addData(9)
+  streamingQuery.processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: 
Row("9", 1) :: Nil)
+} finally {
+  if (streamingQuery ne null) {
+streamingQuery.stop()
+  }
+}
+  }
+})
+
+// 2 - Check recovery with wrong num shuffle partitions
+init()
+withTempDir(dir => {
+  FileUtils.copyDirectory(checkpointDir, dir)
+
+  // Since the number of partitions is greater than 10, should throw 
exception.
+  withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+var streamingQuery: StreamingQuery = null
+try {
+  intercept[StreamingQueryException] {
--- End diff --

https://github.com/apache/spark/pull/17216#discussion_r106033678


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-15 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106267528
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -437,25 +464,28 @@ class StreamExecution(
   }
 }
 if (hasNewData) {
-  // Current batch timestamp in milliseconds
-  offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+  var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
   // Update the eventTime watermark if we find one in the plan.
   if (lastExecution != null) {
 lastExecution.executedPlan.collect {
   case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 
0 =>
 logDebug(s"Observed event time stats: 
${e.eventTimeStats.value}")
 e.eventTimeStats.value.max - e.delayMs
 }.headOption.foreach { newWatermarkMs =>
-  if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+  if (newWatermarkMs > batchWatermarkMs) {
 logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
-offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+batchWatermarkMs = newWatermarkMs
   } else {
 logDebug(
   s"Event time didn't move: $newWatermarkMs < " +
-s"${offsetSeqMetadata.batchWatermarkMs}")
+s"$batchWatermarkMs")
   }
 }
   }
+  offsetSeqMetadata = OffsetSeqMetadata(
--- End diff --

Good point, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106043061
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105792431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -70,13 +69,16 @@ object OffsetSeq {
  * bound the lateness of data that will processed. Time unit: milliseconds
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches, e.g. 
number of shuffle partitions.
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
--- End diff --

Changed to vals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106033678
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
+
+// If the number of partitions is greater, should throw exception.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
--- End diff --

Seems okay to me. Underlying cause is `FileNotFoundException`. Error 
message indicates _Error reading delta file 
/Users/path/to/checkpoint/state/[operator]/[partition]/[batch].delta_
> [info] - SPARK-19873: backward compatibility - recover with wrong num 
shuffle partitions *** FAILED *** (12 seconds, 98 milliseconds)
[info]   org.apache.spark.sql.streaming.StreamingQueryException: Query 
badQuery [id = dddc5e7f-1e71-454c-8362-de18fb5a, runId = 
b2960c74-257a-4eb1-b242-61d13e20655f] terminated with exception: Job aborted 
due to stage failure: Task 10 in stage 1.0 failed 1 times, most recent failure: 
Lost task 10.0 in stage 1.0 (TID 11, localhost, executor driver): 
java.lang.IllegalStateException: Error reading delta file 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta
 of HDFSStateStoreProvider[id = (op=0, part=10), dir = 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10]:
 
/Users/kunalkhamar/spark/target/tmp/spark-2816c3be-610f-450c-a821-6d0c68a12d91/state/0/10/1.delta
 does not exist
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:384)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:336)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:333)
[info]  at scala.Option.getOrElse(Option.scala:121)
[info]  at 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStat

[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106043087
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
--- End diff --

Added more comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-14 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r106042959
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -389,6 +390,61 @@ class StreamSuite extends StreamTest {
 query.stop()
 assert(query.exception.isEmpty)
   }
+
+  test("SPARK-19873: streaming aggregation with change in number of 
partitions") {
+val inputData = MemoryStream[(Int, Int)]
+val agg = inputData.toDS().groupBy("_1").count()
+
+testStream(agg, OutputMode.Complete())(
+  AddData(inputData, (1, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"2")),
+  CheckAnswer((1, 1), (2, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (2, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"5")),
+  CheckAnswer((1, 1), (2, 2), (3, 1)),
+  StopStream,
+  AddData(inputData, (3, 0), (1, 0)),
+  StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> 
"1")),
+  CheckAnswer((1, 2), (2, 2), (3, 2)))
+  }
+
+  test("SPARK-19873: backward compat with checkpoints that do not record 
shuffle partitions") {
+val inputData = MemoryStream[Int]
+inputData.addData(1, 2, 3, 4)
+inputData.addData(3, 4, 5, 6)
+inputData.addData(5, 6, 7, 8)
+
+val resourceUri =
+  
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+val checkpointDir = new File(resourceUri).getCanonicalPath
+val query = inputData
+  .toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .writeStream
+  .queryName("counts")
+  .outputMode("complete")
+  .option("checkpointLocation", checkpointDir)
+  .format("memory")
+
+// Checkpoint data was generated by a query with 10 shuffle partitions.
+// Test if recovery from checkpoint is successful.
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+  query.start().processAllAvailable()
+
+  QueryTest.checkAnswer(spark.table("counts").toDF(),
+Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
+}
--- End diff --

Added `try .. finally`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17216: [SPARK-19873][SS] Record num shuffle partitions in offse...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on the issue:

https://github.com/apache/spark/pull/17216
  
@zsxwing @uncleGen @lw-lin 
This is ready for another review, can you please take a look when you get a 
chance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105312919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

@lw-lin Hi Liwei!
Thanks for letting me know, we will not be updating the log version number 
since backward and forward compatibility is preserved by this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105310100
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

@zsxwing Changed to a map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-09 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105310054
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -380,7 +382,20 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-offsetSeqMetadata = 
nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+val numShufflePartitionsFromConf = 
sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+offsetSeqMetadata = nextOffsets
+  .metadata
+  .getOrElse(OffsetSeqMetadata(0, 0, numShufflePartitionsFromConf))
+
+/*
+ * For backwards compatibility, if # partitions was not recorded 
in the offset log, then
+ * ensure it is non-zero. The new value is picked up from the conf.
+ */
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread kunalkhamar
GitHub user kunalkhamar opened a pull request:

https://github.com/apache/spark/pull/17216

[SPARK-19873][SS] Record num shuffle partitions in offset log and enforce 
in next batch.

## What changes were proposed in this pull request?

If the user changes the shuffle partition number between batches, Streaming 
aggregation will fail.

Here are some possible cases:

- Change "spark.sql.shuffle.partitions"
- Use "repartition" and change the partition number in codes
- RangePartitioner doesn't generate deterministic partitions. Right now 
it's safe as we disallow sort before aggregation. Not sure if we will add some 
operators using RangePartitioner in future.

## How was this patch tested?

Unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kunalkhamar/spark num-partitions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17216


commit 12f5fd30229e441355a05290ed124263c1429acc
Author: Kunal Khamar <kkha...@outlook.com>
Date:   2017-03-08T21:29:02Z

Record num shuffle partitions in offset log and enforce in next batch.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-03-07 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r104781325
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -113,19 +132,26 @@ class SessionStateSuite extends SparkFunSuite
 Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
 }
 
-implicit val enc = Encoders.tuple(Encoders.scalaInt, Encoders.STRING)
+val spark = activeSession
+// Cannot use `import activeSession.implicits._` due to the compiler 
limitation.
+import spark.implicits._
+
 activeSession
--- End diff --

Should create temp view be inside try block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-03-07 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r104780858
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -20,86 +20,105 @@ package org.apache.spark.sql
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.SparkContext
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 
 class SessionStateSuite extends SparkFunSuite
 with BeforeAndAfterEach with BeforeAndAfterAll {
 
+  /**
+   * A shared SparkSession for all tests in this suite. Make sure you 
reset any changes to this
+   * session as this is a singleton HiveSparkSession in 
HiveSessionStateSuite and it's shared
+   * with all Hive test suites.
+   */
   protected var activeSession: SparkSession = _
-  protected var sparkContext: SparkContext = null
 
   override def beforeAll(): Unit = {
-sparkContext = 
SparkSession.builder().master("local").getOrCreate().sparkContext
+activeSession = SparkSession.builder().master("local").getOrCreate()
   }
 
-  protected def createSession(): Unit = {
-activeSession =
-  
SparkSession.builder().master("local").sparkContext(sparkContext).getOrCreate()
-  }
-
-  override def beforeEach(): Unit = {
-createSession()
+  override def afterAll(): Unit = {
+if (activeSession != null) {
+  activeSession.stop()
+  activeSession = null
+}
+super.afterAll()
   }
 
   test("fork new session and inherit RuntimeConfig options") {
 val key = "spark-config-clone"
 activeSession.conf.set(key, "active")
--- End diff --

Shoud this be inside try {} ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-03-07 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r104786618
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -32,10 +32,19 @@ import 
org.apache.spark.sql.streaming.StreamingQueryManager
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
- * @param catalog A Hive client used for interacting with the metastore.
- * @param analyzer An analyzer that uses the Hive metastore.
- * @param plannerCreator Lambda to create a [[SparkPlanner]] that converts 
optimized logical
- *   plans to physical plans.
+ * @param sparkContext The [[SparkContext]].
+ * @param sharedState The shared state.
+ * @param conf SQL-specific key-value configurations.
+ * @param experimentalMethods The experimental methods.
+ * @param functionRegistry Internal catalog for managing functions 
registered by the user.
+ * @param catalog Internal catalog for managing table and database states.
--- End diff --

Add comment on difference from `SessionCatalog`: `HiveSessionCatalog` uses 
Hive client for interacting with metastore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-03-07 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r104784305
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1241,16 +1246,30 @@ class SessionCatalogSuite extends PlanTest {
 assert(clone.getCurrentDatabase == db1)
 
 // check if clone and original independent
-val db2 = "copytest2"
-val tempTable2 = Range(1, 20, 2, 20)
-clone.createTempView(db2, tempTable2, overrideIfExists = false)
 clone.setCurrentDatabase(db2)
 assert(original.getCurrentDatabase == db1)
-
-val db3 = "copytest3"
-val tempTable3 = Range(1, 30, 2, 30)
-original.createTempView(db3, tempTable3, overrideIfExists = false)
 original.setCurrentDatabase(db3)
 assert(clone.getCurrentDatabase == db2)
   }
+
+  test("SPARK-19737: detect undefined functions without triggering 
relation resolution") {
--- End diff --

Is this supposed to be part of this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103352266
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -17,43 +17,60 @@
 
 package org.apache.spark.sql.internal
 
-import java.io.File
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryManager}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
  */
-private[sql] class SessionState(sparkSession: SparkSession) {
+private[sql] class SessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+val conf: SQLConf,
+val experimentalMethods: ExperimentalMethods,
+val functionRegistry: FunctionRegistry,
+val catalog: SessionCatalog,
+val sqlParser: ParserInterface,
+val analyzer: Analyzer,
+val streamingQueryManager: StreamingQueryManager,
+val queryExecutionCreator: LogicalPlan => QueryExecution) {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307491
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkPlanner = {
+
--- En

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103338672
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.experimental ne activeSession.experimental)
+assert(forkedSession.experimental.extraOptimizations.toSet ==
+  activeSession.experimental.extraOptimizations.toSet)
+
+// independence
+forkedSession.experimental.extraOptimizations = List(DummyRule2)
+assert(activeSession.experimental.extraOptimizations == optimizations)
+activeSession.experimental.extraOptimizations = List(DummyRule1)
+assert(forkedSession.experimental.extraOptimizations == 
List(DummyRule2))
+  }
+
+  test("fork new sessions and run query on inherited table") {
+def checkTableExists(sparkSession: SparkSession): Unit = {
+  QueryTest.checkAnswer(sparkSession.sql(
+"""

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103305709
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +208,29 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val hadoopConf = new Configuration(copyHadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
hadoopConf.set(k, v) }
+hadoopConf
+  }
 
-  /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: SessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
 
-  /**
-   * Logical query plan analyzer for resolving unresolved attributes and 
relations.
-   */
-  lazy val analyzer: Analyzer = {
-new Analyzer(catalog, conf) {
+new Analyzer(catalog, sqlConf) {
   override val extendedResolutionRules =
 new FindDataSourceTable(sparkSession) ::
 new ResolveSQLOnFile(sparkSession) :: Nil
 
   override val postHocResolutionRules =
 PreprocessTableCreation(sparkSession) ::
-PreprocessTableInsertion(conf) ::
-DataSourceAnalysis(conf) :: Nil
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) :: Nil
 
   override val extendedCheckRules = Seq(PreWriteCheck, HiveOnlyCheck)
 }
   }
 
-  /**
-   * Logical query plan optimizer.
-   */
-  lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, 
experimentalMethods)
-
-  /**
-   * Parser that extracts expressions, plans, table identifiers etc. from 
SQL texts.
-   */
-  lazy val sqlParser: ParserInterface = new SparkSqlParser(conf)
-
-  /**
-   * Planner that converts optimized logical plans to physical plans.
-   */
-  def planner: SparkPlanner =
-new SparkPlanner(sparkSession.sparkContext, conf, 
experimentalMethods.extraStrategies)
-
-  /**
-   * An interface to register custom 
[[org.apache.spark.sql.util.QueryExecutionListener]]s
-   * that listen for execution metrics.
-   */
-  lazy val listenerManager: ExecutionListenerManager = new 
ExecutionListenerManager
-
-  /**
-   * Interface to start and stop [[StreamingQuery]]s.
-   */
-  lazy val streamingQueryManager: StreamingQueryManager = {
-new StreamingQueryManager(sparkSession)
-  }
-
-  private val jarClassLoader: NonClosableMutableURLClassLoader =
-sparkSession.sharedState.jarClassLoader
-
-  // Automatically extract all entries and put it in our SQLConf
-  // We need to call it after all of vals have been initialized.
-  sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
-conf.setConfString(k, v)
-  }
-
-  // --
-  //  Helper methods, partially leftover from pre-2.0 days
-  // --
-
-  def executePlan(plan: LogicalPlan): QueryExecution = new 
QueryExecution(sparkSession, plan)
-
-  def refreshTable(tableName: String): Unit = {
-catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
-  }
-
-  def addJar(path: String): Unit = {
-sparkSession.sparkContext.addJar(path)
-
-val uri = new Path(path).toUri
-val jarURL = if (uri.getScheme == null) {
-  // `path` is a local file path without a URL scheme
-  new File(path).toURI.toURL
-} else {
-  // `path` is a URL with a scheme
-  uri.toURL
-}
-jarClassLoader.addURL(jarURL)
-Thread.currentThread().setContextClassLoader(jarClassLoader)
-  }
-
-  /**
-   * Analyzes the given table in the current database to generate 
statistics, which will be
-   * used in query optimizations.
-   */
-  def analyze(tableIdent: TableIdentifier, noscan: Boolean = true): Unit = 
{
--- End diff --

Cool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your p

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r10330
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+
+class SessionStateSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  protected var activeSession: SparkSession = _
+
+  protected def createSession(): Unit = {
+activeSession = SparkSession.builder().master("local").getOrCreate()
+  }
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {
+activeSession.stop()
+  }
+
+  test("fork new session and inherit RuntimeConfig options") {
+val key = "spark-config-clone"
+activeSession.conf.set(key, "active")
+
+// inheritance
+val forkedSession = activeSession.cloneSession()
+assert(forkedSession ne activeSession)
+assert(forkedSession.conf ne activeSession.conf)
+assert(forkedSession.conf.get(key) == "active")
+
+// independence
+forkedSession.conf.set(key, "forked")
+assert(activeSession.conf.get(key) == "active")
+activeSession.conf.set(key, "dontcopyme")
+assert(forkedSession.conf.get(key) == "forked")
+  }
+
+  test("fork new session and inherit function registry and udf") {
+activeSession.udf.register("strlenScala", (_: String).length + (_: 
Int))
+val forkedSession = activeSession.cloneSession()
+
+// inheritance
+assert(forkedSession ne activeSession)
+assert(forkedSession.sessionState.functionRegistry ne
+  activeSession.sessionState.functionRegistry)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+
+// independence
+forkedSession.sessionState.functionRegistry.dropFunction("strlenScala")
+
assert(activeSession.sessionState.functionRegistry.lookupFunction("strlenScala").nonEmpty)
+activeSession.udf.register("addone", (_: Int) + 1)
+
assert(forkedSession.sessionState.functionRegistry.lookupFunction("addone").isEmpty)
+  }
+
+  test("fork new session and inherit experimental methods") {
+object DummyRule1 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+object DummyRule2 extends Rule[LogicalPlan] {
+  def apply(p: LogicalPlan): LogicalPlan = p
+}
+val optimizations = List(DummyRule1, DummyRule2)
+
+activeSession.experimental.extraOptimizations = optimizations
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103337066
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336475
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295794
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1196,4 +1198,28 @@ class SessionCatalogSuite extends PlanTest {
   catalog.listFunctions("unknown_db", "func*")
 }
   }
+
+  test("copy SessionCatalog") {
--- End diff --

changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103347559
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---
@@ -493,6 +493,28 @@ class CatalogSuite
 }
   }
 
+  test("clone Catalog") {
+// need to test tempTables are cloned
+assert(spark.catalog.listTables().collect().isEmpty)
+
+createTempTable("my_temp_table")
+assert(spark.catalog.listTables().collect().map(_.name).toSet == 
Set("my_temp_table"))
+
+// inheritance
+val forkedSession = spark.cloneSession()
+assert(spark ne forkedSession)
+assert(spark.catalog ne forkedSession.catalog)
+assert(forkedSession.catalog.listTables().collect().map(_.name).toSet 
== Set("my_temp_table"))
+
+// independence
+dropTable("my_temp_table") // drop table in original session
+assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
+assert(forkedSession.catalog.listTables().collect().map(_.name).toSet 
== Set("my_temp_table"))
+forkedSession.sessionState.catalog
+  .createTempView("fork_table", Range(1, 2, 3, 4), overrideIfExists = 
true)
+assert(spark.catalog.listTables().collect().map(_.name).toSet == Set())
+  }
+
   // TODO: add tests for the rest of them
--- End diff --

After that TODO was added, there have been many additions of tests.
This is a large interface, so the tests are scattered over this suite and 
`GlobalTempViewSuite`, `CachedTableSuite`, 
`PartitionProviderCompatibilitySuite`, `(Hive)MetadataCacheSuite`, `DDLSuite`, 
`ParquetQuerySuite` to name a few.
Though I do see at least one test for every method in the trait `Catalog`.
Checked with @cloud-fan in person, we should be fine with removing this 
TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103331408
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -17,89 +17,50 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.Analyzer
-import org.apache.spark.sql.execution.SparkPlanner
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlanner, 
SparkSqlParser}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]] backed by Hive.
  */
-private[hive] class HiveSessionState(sparkSession: SparkSession)
-  extends SessionState(sparkSession) {
-
-  self =>
-
-  /**
-   * A Hive client used for interacting with the metastore.
-   */
-  lazy val metadataHive: HiveClient =
-
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.newSession()
-
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-new HiveSessionCatalog(
-  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
-  sparkSession.sharedState.globalTempViewManager,
-  sparkSession,
-  functionResourceLoader,
-  functionRegistry,
+private[hive] class HiveSessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+conf: SQLConf,
+experimentalMethods: ExperimentalMethods,
+functionRegistry: FunctionRegistry,
+override val catalog: HiveSessionCatalog,
+sqlParser: ParserInterface,
+val metadataHive: HiveClient,
+override val analyzer: Analyzer,
--- End diff --

Previous implementation needed it to be that way. But can remove `override` 
now. Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
--- End diff --

changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103302329
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
+val sparkContext = sparkSession.sparkContext
+
+// SQL-specific key-value configurations.
+val sqlConf = conf.getOrElse(new SQLConf)
+
+// Automatically extract all entries and put them in our SQLConf
+mergeSparkConf(sqlConf, sparkContext.getConf)
+
+// Internal catalog for managing functions registered by the user.
+val functionRegistry = FunctionRegistry.builtin.clone()
+
+// A class for loading resources specified by a function.
+val functionResourceLoader: FunctionResourceLoader =
+  createFunctionResourceLoader(sparkContext, sparkSession.sharedState)
+
+// Parser that extracts expressions, plans, table identifiers etc. 
from SQL texts.
+val sqlParser: ParserInterface = new SparkSqlParser(sqlConf)
+
+// Internal catalog for managing table and database states.
+val catalog = new SessionCatalog(
+  sparkSession.sharedState.externalCatalog,
+  sparkSession.sharedState.globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  sqlConf,
+  newHadoopConf(sparkContext.hadoopConfiguration, sqlConf),
+  sqlParser)
+
+// Logical query plan analyzer for resolving unresolved attributes and 
relations.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, sqlConf)
+
+// Interface to start and stop [[StreamingQuery]]s.
+val streamingQueryManager: StreamingQueryManager = new 
StreamingQueryManager(sparkSession)
+
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(sparkSession, plan)
+
+new SessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  sqlConf,
+  new ExperimentalMethods,
+  functionRegistry,
+  catalog,
+  sqlParser,
+  analyzer,
+  streamingQueryManager,
+  queryExecutionCreator)
+  }
+
+  def createFunctionResourceLoader(
--- End diff --

`createFunctionResourceLoader` is also used in `HiveSessionState.apply`, 
private would make it inaccessible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not w

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307776
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
+  sparkSession: SparkSession,
+  catalog: HiveSessionCatalog,
+  sqlConf: SQLConf): Analyzer = {
+
+new Analyzer(catalog, sqlConf) {
+  override val extendedResolutionRules =
+new ResolveHiveSerdeTable(sparkSession) ::
+new FindDataSourceTable(sparkSession) ::
+new FindHiveSerdeTable(sparkSession) ::
+new ResolveSQLOnFile(sparkSession) :: Nil
+
+  override val postHocResolutionRules =
+catalog.ParquetConversions ::
+catalog.OrcConversions ::
+PreprocessTableCreation(sparkSession) ::
+PreprocessTableInsertion(sqlConf) ::
+DataSourceAnalysis(sqlConf) ::
+HiveAnalysis :: Nil
+
+  override val extendedCheckRules = Seq(PreWriteCheck)
+}
+  }
+
+  def createPlannerCreator(
+  associatedSparkSession: SparkSession,
+  sqlConf: SQLConf,
+  experimentalMethods: ExperimentalMethods): () => SparkP

[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103328320
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveSessionStateSuite extends SessionStateSuite
+  with TestHiveSingleton with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {}
+
+  override def createSession(): Unit = {
+activeSession = spark.newSession()
+  }
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103303272
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +203,37 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
--- End diff --

Changed name.
This is also used in `HiveSessionState.apply`, would be rendered 
inaccessible if `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103295692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1181,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
+   * The table relation cache will not be populated.
+   * @note `externalCatalog` and `globalTempViewManager` are from shared 
state, don't need deep
+   * copy. `FunctionResourceLoader` is effectively stateless, also does 
not need deep copy.
+   * All arguments passed in should be associated with a particular 
`SparkSession`.
+   */
+  def clone(
+  conf: CatalystConf,
+  hadoopConf: Configuration,
+  functionRegistry: FunctionRegistry,
+  parser: ParserInterface): SessionCatalog = {
+
+val catalog = new SessionCatalog(
+  externalCatalog,
+  globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  conf,
+  hadoopConf,
+  parser)
+
+synchronized {
+  catalog.currentDb = currentDb
+  // copy over temporary tables
+  tempTables.foreach(kv => catalog.tempTables.put(kv._1, kv._2))
+}
+
+catalog
+  }
+
--- End diff --

Removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103306212
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
@@ -136,6 +139,26 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
 }
 SparkSession.sqlListener.get()
   }
+
+  /*
+   * This belongs here more than in `SessionState`. However, does not seem 
that it can be
+   * removed from `SessionState` and `HiveSessionState` without using 
reflection in
+   * `AddJarCommand`.
+   */
+  def addJar(path: String): Unit = {
+sparkContext.addJar(path)
+
--- End diff --

removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307299
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---
@@ -212,3 +247,31 @@ private[sql] class HiveSessionCatalog(
 "histogram_numeric"
   )
 }
+
+private[sql] object HiveSessionCatalog {
+
+  def apply(
+  sparkSession: SparkSession,
+  functionResourceLoader: FunctionResourceLoader,
+  functionRegistry: FunctionRegistry,
+  conf: SQLConf,
+  hadoopConf: Configuration,
+  parser: ParserInterface): HiveSessionCatalog = {
+
+// Catalog for handling data source tables. TODO: This really doesn't 
belong here since it is
+// essentially a cache for metastore tables. However, it relies on a 
lot of session-specific
+// things so it would be a lot of work to split its functionality 
between HiveSessionCatalog
+// and HiveCatalog. We should still do it at some point...
+val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
+
+new HiveSessionCatalog(
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog],
+  sparkSession.sharedState.globalTempViewManager,
+  metastoreCatalog,
+  functionResourceLoader: FunctionResourceLoader,
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103329699
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---
@@ -144,11 +145,37 @@ private[hive] class TestHiveSparkSession(
 existingSharedState.getOrElse(new SharedState(sc))
   }
 
-  // TODO: Let's remove TestHiveSessionState. Otherwise, we are not really 
testing the reflection
-  // logic based on the setting of CATALOG_IMPLEMENTATION.
+  private def createHiveSessionState: HiveSessionState = {
--- End diff --

Neat, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103298123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala ---
@@ -46,4 +46,10 @@ class ExperimentalMethods private[sql]() {
 
   @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil
 
+  override def clone(): ExperimentalMethods = {
--- End diff --

Good point, added a sync block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103297916
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 ---
@@ -1196,4 +1198,28 @@ class SessionCatalogSuite extends PlanTest {
   catalog.listFunctions("unknown_db", "func*")
 }
   }
+
+  test("copy SessionCatalog") {
+val externalCatalog = newEmptyCatalog()
+val original = new SessionCatalog(externalCatalog)
+val tempTable1 = Range(1, 10, 1, 10)
+original.createTempView("copytest1", tempTable1, overrideIfExists = 
false)
+
+// check if tables copied over
+val clone = original.clone(
+  SimpleCatalystConf(caseSensitiveAnalysis = true),
+  new Configuration(),
+  new SimpleFunctionRegistry,
+  CatalystSqlParser)
+assert(original ne clone)
+assert(clone.getTempView("copytest1") == Option(tempTable1))
+
+// check if clone and original independent
+clone.dropTable(TableIdentifier("copytest1"), ignoreIfNotExists = 
false, purge = false)
+assert(original.getTempView("copytest1") == Option(tempTable1))
+
+val tempTable2 = Range(1, 20, 2, 10)
--- End diff --

Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307383
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103305545
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -90,110 +203,37 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 }
   }
 
-  /**
-   * Internal catalog for managing table and database states.
-   */
-  lazy val catalog = new SessionCatalog(
-sparkSession.sharedState.externalCatalog,
-sparkSession.sharedState.globalTempViewManager,
-functionResourceLoader,
-functionRegistry,
-conf,
-newHadoopConf(),
-sqlParser)
+  def newHadoopConf(copyHadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
+val hadoopConf = new Configuration(copyHadoopConf)
+sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
hadoopConf.set(k, v) }
+hadoopConf
+  }
 
-  /**
-   * Interface exposed to the user for registering user-defined functions.
-   * Note that the user-defined functions must be deterministic.
-   */
-  lazy val udf: UDFRegistration = new UDFRegistration(functionRegistry)
+  def createAnalyzer(
--- End diff --

Added docs, changed to private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103307420
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---
@@ -146,4 +107,153 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
 conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  /**
+   * Get an identical copy of the `HiveSessionState`.
+   * This should ideally reuse the `SessionState.clone` but cannot do so.
+   * Doing that will throw an exception when trying to clone the catalog.
+   */
+  override def clone(newSparkSession: SparkSession): HiveSessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val experimentalMethodsCopy = experimentalMethods.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  newSparkSession,
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+val hiveClient =
+  
newSparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new HiveSessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethodsCopy,
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  hiveClient,
+  HiveSessionState.createAnalyzer(newSparkSession, catalogCopy, 
confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator,
+  HiveSessionState.createPlannerCreator(
+newSparkSession,
+confCopy,
+experimentalMethodsCopy))
+  }
+
+}
+
+object HiveSessionState {
+
+  def apply(sparkSession: SparkSession): HiveSessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): HiveSessionState = {
+
+val initHelper = SessionState(sparkSession, conf)
+
+val sparkContext = sparkSession.sparkContext
+
+val catalog = HiveSessionCatalog(
+  sparkSession,
+  SessionState.createFunctionResourceLoader(sparkContext, 
sparkSession.sharedState),
+  initHelper.functionRegistry,
+  initHelper.conf,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
initHelper.conf),
+  initHelper.sqlParser)
+
+// A Hive client used for interacting with the metastore.
+val metadataHive: HiveClient =
+  
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+.newSession()
+
+// An analyzer that uses the Hive metastore.
+val analyzer: Analyzer = createAnalyzer(sparkSession, catalog, 
initHelper.conf)
+
+val plannerCreator = createPlannerCreator(
+  sparkSession,
+  initHelper.conf,
+  initHelper.experimentalMethods)
+
+new HiveSessionState(
+  sparkContext,
+  sparkSession.sharedState,
+  initHelper.conf,
+  initHelper.experimentalMethods,
+  initHelper.functionRegistry,
+  catalog,
+  initHelper.sqlParser,
+  metadataHive,
+  analyzer,
+  initHelper.streamingQueryManager,
+  initHelper.queryExecutionCreator,
+  plannerCreator)
+  }
+
+  def createAnalyzer(
--- End diff --

Yes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103336676
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -65,22 +82,118 @@ private[sql] class SessionState(sparkSession: 
SparkSession) {
 hadoopConf
   }
 
-  lazy val experimentalMethods = new ExperimentalMethods
-
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Get an identical copy of the `SessionState` and associate it with the 
given `SparkSession`
*/
-  lazy val functionRegistry: FunctionRegistry = 
FunctionRegistry.builtin.copy()
+  def clone(newSparkSession: SparkSession): SessionState = {
+val sparkContext = newSparkSession.sparkContext
+val confCopy = conf.clone()
+val functionRegistryCopy = functionRegistry.clone()
+val sqlParser: ParserInterface = new SparkSqlParser(confCopy)
+val catalogCopy = catalog.clone(
+  confCopy,
+  SessionState.newHadoopConf(sparkContext.hadoopConfiguration, 
confCopy),
+  functionRegistryCopy,
+  sqlParser)
+val queryExecutionCreator = (plan: LogicalPlan) => new 
QueryExecution(newSparkSession, plan)
+
+SessionState.mergeSparkConf(confCopy, sparkContext.getConf)
+
+new SessionState(
+  sparkContext,
+  newSparkSession.sharedState,
+  confCopy,
+  experimentalMethods.clone(),
+  functionRegistryCopy,
+  catalogCopy,
+  sqlParser,
+  SessionState.createAnalyzer(newSparkSession, catalogCopy, confCopy),
+  new StreamingQueryManager(newSparkSession),
+  queryExecutionCreator)
+  }
+
+  // --
+  //  Helper methods, partially leftover from pre-2.0 days
+  // --
+
+  def executePlan(plan: LogicalPlan): QueryExecution = 
queryExecutionCreator(plan)
+
+  def refreshTable(tableName: String): Unit = {
+catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
+  }
+
+  def addJar(path: String): Unit = sharedState.addJar(path)
+}
+
+
+object SessionState {
+
+  def apply(sparkSession: SparkSession): SessionState = {
+apply(sparkSession, None)
+  }
+
+  def apply(
+  sparkSession: SparkSession,
+  conf: Option[SQLConf]): SessionState = {
+
--- End diff --

Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103298622
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -17,43 +17,60 @@
 
 package org.apache.spark.sql.internal
 
-import java.io.File
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.streaming.{StreamingQuery, 
StreamingQueryManager}
+import org.apache.spark.sql.streaming.StreamingQueryManager
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
 /**
  * A class that holds all session-specific state in a given 
[[SparkSession]].
  */
-private[sql] class SessionState(sparkSession: SparkSession) {
+private[sql] class SessionState(
+sparkContext: SparkContext,
+sharedState: SharedState,
+val conf: SQLConf,
+val experimentalMethods: ExperimentalMethods,
+val functionRegistry: FunctionRegistry,
+val catalog: SessionCatalog,
+val sqlParser: ParserInterface,
+val analyzer: Analyzer,
+val streamingQueryManager: StreamingQueryManager,
+val queryExecutionCreator: LogicalPlan => QueryExecution) {
+
+  /**
+   * Interface exposed to the user for registering user-defined functions.
+   * Note that the user-defined functions must be deterministic.
+   */
+  val udf: UDFRegistration = new UDFRegistration(functionRegistry)
 
-  // Note: These are all lazy vals because they depend on each other (e.g. 
conf) and we
-  // want subclasses to override some of the fields. Otherwise, we would 
get a lot of NPEs.
+  // Logical query plan optimizer.
--- End diff --

Changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103308243
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionCatalogSuite.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.internal.SQLConf
+
+class HiveSessionCatalogSuite extends SessionCatalogSuite {
--- End diff --

Good point, removed extends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-27 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103308317
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSessionStateSuite.scala 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
+class HiveSessionStateSuite extends SessionStateSuite
+  with TestHiveSingleton with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+createSession()
+  }
+
+  override def afterEach(): Unit = {}
+
+  override def createSession(): Unit = {
+activeSession = spark.newSession()
--- End diff --

Added comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16826: [SPARK-19540][SQL] Add ability to clone SparkSess...

2017-02-24 Thread kunalkhamar
Github user kunalkhamar commented on a diff in the pull request:

https://github.com/apache/spark/pull/16826#discussion_r103060505
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -1178,4 +1178,36 @@ class SessionCatalog(
 }
   }
 
+  /**
+   * Get an identical copy of the `SessionCatalog`.
+   * The temporary tables and function registry are retained.
+   * The table relation cache will not be populated.
+   * @note `externalCatalog` and `globalTempViewManager` are from shared 
state, don't need deep copy
+   * `FunctionResourceLoader` is effectively stateless, also does not need 
deep copy.
+   * All arguments passed in should be associated with a particular 
`SparkSession`.
+   */
+  def copy(
+  conf: CatalystConf,
+  hadoopConf: Configuration,
+  functionRegistry: FunctionRegistry,
+  parser: ParserInterface): SessionCatalog = {
+
+val catalog = new SessionCatalog(
+  externalCatalog,
+  globalTempViewManager,
+  functionResourceLoader,
+  functionRegistry,
+  conf,
+  hadoopConf,
+  parser)
+
+synchronized {
--- End diff --

Right, I should mention we decided to keep it `synchronized`. `catalog` is 
not yet accessible outside local scope.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >