[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/9812

[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from …

…parent class loader

Without patch, two additional tests of ExecutorClassLoaderSuite fails.

- "resource from parent"
- "resources from parent"

Detailed explanation is here, 
https://issues.apache.org/jira/browse/SPARK-11818?focusedCommentId=15011202&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15011202

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-11818

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/9812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #9812


commit 698159a0a1fcf8c0ba042daf875d037df3f8ed6f
Author: Jungtaek Lim 
Date:   2015-11-18T15:21:18Z

[SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from parent 
class loader

* Without patch, some tests of ExecutorClassLoaderSuite fails




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45260904
  
--- Diff: core/src/main/scala/org/apache/spark/TestUtils.scala ---
@@ -159,6 +159,16 @@ private[spark] object TestUtils {
 createCompiledClass(className, destDir, sourceFile, classpathUrls)
   }
 
+  def createResource(
--- End diff --

@srowen Thanks, I'll inlining it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45261349
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@srowen 
It doesn't need to check ```userClassPathFirst``` since this implementation 
implies that REPL never provides resources dynamically so there's no need to 
lookup resource from ExecutorClassLoader itself.

Btw, could precondition be broken? I couldn't imagine REPL generating 
resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45285698
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
I didn't see use case you mentioned but it could make sense.

In order to achieve, we have to implement findResource() and 
findResources() for ExecutorClassLoader since ExecutorClassLoader cannot rely 
on superclass (ClassLoader) to load class / resource.
It is easy to provide resource URL which points to origin scheme (http, 
https, ftp, hdfs), but since I'm new to classloader, so I'm wondering it is 
safe to return URL from findResource() and findResources() which doesn't point 
to local file.

If it is not safe to return non local file as URL, what's recommended way 
to do?
I can only think about downloading files to local temp directory per every 
call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45288354
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
To clarify about "feature", do you want me to change implementation of 
findResource() and findResources() for pointing origin scheme, and forget about 
potential odd? Or forget about finding resources from REPL uri and leave as 
this PR is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-18 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/9812#discussion_r45288990
  
--- Diff: 
repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala ---
@@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: 
String, parent: ClassLoader
 }
   }
 
+  override def getResource(name: String): URL = {
+parentLoader.getResource(name)
--- End diff --

@vanzin 
OK, Thanks for clarification! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/spark/pull/9812#issuecomment-158338205
  
@vanzin Thanks for reviewing, I addressed your comment. Please take a look 
again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...

2015-11-20 Thread HeartSaVioR
Github user HeartSaVioR commented on the pull request:

https://github.com/apache/spark/pull/9812#issuecomment-158377559
  
Failed tests seems not related.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21136
  
LGTM. This is what I also found so far today.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21152

[SPARK-23688][SS] Refactor tests away from rate source

## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode. Keep using 
"rate" source if the tests intend to put data periodically in background, or 
need to write short source name, since "memory" doesn't have provider for 
source.

## How was this patch tested?

Ran relevant test suite from IDE.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-23688

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21152


commit 5aac856b3ef0118d174f016fc6a476f0facf174b
Author: Jungtaek Lim 
Date:   2018-04-25T09:46:30Z

[SPARK-23688][SS] Refactor tests away from rate source

* replace rate source with memory source in continous mode




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
@jose-torres Please review this PR. Thanks!
cc. @jerryshao @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184006570
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

I intended to use untyped filter because of SPARK-24061. Once #21136 is 
merged we could change this, but not sure we want to have both untyped and 
typed for every tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184213878
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

@jose-torres 
Yeah my intention is ensuring Spark operations work same as Scala 
collection methods, but sure enumerating is also OK since we all know about the 
result easily. 
Are you in favor of enumerating literals we already know instead of 
calculating for all the tests? Or just only this line? Just would like to apply 
the approach consistently.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
also cc. to @tdas since he reviews SS related PRs (as well as continuous 
mode) so far.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184236837
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).map(_ * 2): _*))
   }
 
   test("flatMap") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 
2))
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n 
* 2)).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*),
+  StopStream,
+  AddData(input, 3.to(5): _*),
+  StartStream(),
+  CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .where('value > 5)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().where('value > 5)
--- End diff --

@jose-torres What do you think about this? Would it be better to have tests 
for untyped and typed? Code duplication is not that huge since I guess logic 
for verification can be reused for every test.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21063: [SPARK-23886][Structured Streaming] Update query ...

2018-04-25 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21063#discussion_r184241625
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 ---
@@ -111,7 +112,12 @@ trait ProgressReporter extends Logging {
 logDebug("Starting Trigger Calculation")
 lastTriggerStartTimestamp = currentTriggerStartTimestamp
 currentTriggerStartTimestamp = triggerClock.getTimeMillis()
-currentStatus = currentStatus.copy(isTriggerActive = true)
+// isTriggerActive field is kept false for ContinuousExecution
+// since it is tied to MicroBatchExecution
+this match {
--- End diff --

nit: someone may have a concern that a trait needs to be aware of actual 
implementation. 

There looks like two options: 
1. extract method to only update currentStatus for starting trigger 
defaulting to `isTriggerActive = true`, and let `ContinuousExecution` overrides 
the method. 
2. just override `startTrigger()` in `ContinuousExecution`, and call 
`super.startTrigger()` and update currentStatus once again. It might open very 
small window for other threads to read invalid status information 
(isTriggerActive = true), but will require less change if it is acceptable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184321762
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
--- End diff --

Thanks for the pointer. I started from `(0 to 5)` but spark Scala style 
guide mentions avoiding infix notation so a bit puzzled (I was not sure `to` is 
an operator). Will update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21152#discussion_r184322699
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 ---
@@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase {
 val input = ContinuousMemoryStream[Int]
 
 testStream(input.toDF())(
-  AddData(input, 0, 1, 2),
-  CheckAnswer(0, 1, 2),
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2): _*),
   StopStream,
-  AddData(input, 3, 4, 5),
+  AddData(input, 3.to(5): _*),
   StartStream(),
-  CheckAnswer(0, 1, 2, 3, 4, 5))
+  CheckAnswer(0.to(5): _*))
   }
 
   test("map") {
-val df = spark.readStream
-  .format("rate")
-  .option("numPartitions", "5")
-  .option("rowsPerSecond", "5")
-  .load()
-  .select('value)
-  .map(r => r.getLong(0) * 2)
+val input = ContinuousMemoryStream[Int]
+val df = input.toDF().map(_.getInt(0) * 2)
 
-testStream(df, useV2Sink = true)(
-  StartStream(longContinuousTrigger),
-  AwaitEpoch(0),
-  Execute(waitForRateSourceTriggers(_, 2)),
-  IncrementEpoch(),
-  Execute(waitForRateSourceTriggers(_, 4)),
-  IncrementEpoch(),
-  CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_
+testStream(df)(
+  AddData(input, 0.to(2): _*),
+  CheckAnswer(0.to(2).map(_ * 2): _*),
--- End diff --

Got it. Looks like we could reduce the range and list out literals. Will 
update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-26 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
Thanks fore reviewing. I have addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source

2018-04-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21152
  
@jerryshao Thanks for merging! My Apache JIRA ID is “kabhwan”


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185198458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.io.Closeable
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A wrapper for a continuous processing data reader, including a reading 
queue and epoch markers.
+ *
+ * This will be instantiated once per partition - successive calls to 
compute() in the
+ * [[ContinuousDataSourceRDD]] will reuse the same reader. This is 
required to get continuity of
+ * offsets across epochs.
+ *
+ * For performance reasons, this is very weakly encapsulated. There are 
six handles for the RDD:
+ *  * currentOffset - contains the offset of the most recent row which a 
compute() iterator has sent
+ *upwards. The RDD is responsible for advancing this.
+ *  * currentEpoch - the epoch which is currently occurring. The RDD is 
responsible for incrementing
+ *this before ending the compute() iterator.
+ *  * queue - the queue of incoming rows (row, offset) or epoch markers 
(null, null). The
+ *ContinuousQueuedDataReader writes into this queue, and RDD.compute() 
will read from it.
+ *  * {epochPoll|dataReader}Failed - flags to check if the epoch poll and 
data reader threads are
+ *still running. These threads won't be restarted if they fail, so the 
RDD should intercept
+ *this state when convenient to fail the query.
+ *  * close() - to close this reader when the query is going to shut down.
+ */
+class ContinuousQueuedDataReader(
+split: Partition,
+context: TaskContext,
+dataQueueSize: Int,
+epochPollIntervalMs: Long) extends Closeable {
+  private val reader = 
split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]]
+.readerFactory.createDataReader()
+
+  // Important sequencing - we must get our starting point before the 
provider threads start running
+  var currentOffset: PartitionOffset = 
ContinuousDataSourceRDD.getBaseReader(reader).getOffset
+  var currentEpoch: Long = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  // This queue contains two types of messages:
+  // * (null, null) representing an epoch boundary.
+  // * (row, off) containing a data row and its corresponding 
PartitionOffset.
+  val queue = new ArrayBlockingQueue[(UnsafeRow, 
PartitionOffset)](dataQueueSize)
+
+  val epochPollFailed = new AtomicBoolean(false)
+  val dataReaderFailed = new AtomicBoolean(false)
+
+  private val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+
+  private val epochPollExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+s"epoch-poll--$coordinatorId--${context.partitionId()}")
+  val epochPollRunnable = new EpochPollRunnable(queue, context, 
epochPollFailed)
+  epochPollExecutor.scheduleWithFixedDelay(
+epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS)
+
+  val dataReaderThread = new DataReaderThread(reader, queue, context, 
dataReaderFailed)
+  dataReaderThread.setDaemon(true)
+  dataReaderThread.start()
+
+  context.addTaskCompletionListener(_ => {
--- End diff --

Maybe better to just call `close` if `this` is visible.


---

--

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185187424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

This line is effectively no-op unless we exit the loop afterwards. So 
better to clarify the behavior and fix it.
I know this code block is just same as of now so it might be out of topic. 
If we would like to address it from other issue, I'm happy to file an issue and 
also work on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185194384
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunna

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185201032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 ---
@@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: 
StreamWriter, query: SparkPla
   case _ => new 
InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
 }
 
-val rdd = query.execute()
+val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
+val messages = new Array[WriterCommitMessage](rdd.partitions.length)
 
 logInfo(s"Start processing data source writer: $writer. " +
-  s"The input RDD has ${rdd.getNumPartitions} partitions.")
-// Let the epoch coordinator know how many partitions the write RDD 
has.
+  s"The input RDD has ${messages.length} partitions.")
 EpochCoordinatorRef.get(
-
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-sparkContext.env)
+  
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+  sparkContext.env)
   .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
 
 try {
   // Force the RDD to run so continuous processing starts; no data is 
actually being collected
   // to the driver, as ContinuousWriteRDD outputs nothing.
-  sparkContext.runJob(
-rdd,
-(context: TaskContext, iter: Iterator[InternalRow]) =>
-  WriteToContinuousDataSourceExec.run(writerFactory, context, 
iter),
-rdd.partitions.indices)
+  rdd.collect()
 } catch {
   case _: InterruptedException =>
-// Interruption is how continuous queries are ended, so accept and 
ignore the exception.
+  // Interruption is how continuous queries are ended, so accept and 
ignore the exception.
   case cause: Throwable =>
+logError(s"Data source writer $writer is aborting.")
--- End diff --

Could you please explain the needs of additional handling? Since 
ContinuousWriteRDD is still handling the error case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r18524
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.BlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
+
+/**
+ * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates 
the queue with
+ * (null, null) when a new epoch marker arrives.
+ */
+class EpochPollRunnable(
+queue: BlockingQueue[(UnsafeRow, PartitionOffset)],
+context: TaskContext,
+failedFlag: AtomicBoolean)
+  extends Thread with Logging {
+  private[continuous] var failureReason: Throwable = _
+
+  private val epochEndpoint = EpochCoordinatorRef.get(
+
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), 
SparkEnv.get)
+  // Note that this is *not* the same as the currentEpoch in 
[[ContinuousDataQueuedReader]]! That
+  // field represents the epoch wrt the data being processed. The 
currentEpoch here is just a
+  // counter to ensure we send the appropriate number of markers if we 
fall behind the driver.
+  private var currentEpoch = 
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+  override def run(): Unit = {
+try {
+  val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch)
+  for (i <- currentEpoch to newEpoch - 1) {
--- End diff --

Please correct me if I'm missing. My understanding is that the situation 
(gap bigger than 1) should only occur when array queue gets full and blocks 
epoch thread to put marker more than trigger interval. Any other situations 
(error cases) should just crash the whole query so that recovery happens from 
the scratch: that's why we can ignore the missing case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185282844
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

I meant current logic still call queue.poll again instead of using assigned 
epoch marker value, even if it matches the if statement. It looks like 
unintended, right?
We can arrange the logic to fail-fast on exception cases, and if-else to 
fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185316062
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunna

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185317000
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
--- End diff --

Yeah, that's what I also missed. Thanks for correcting. :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185326551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunna

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r185328820
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunna

[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...

2018-05-01 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21200#discussion_r18520
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import java.util.concurrent.TimeUnit
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, 
RowToUnsafeDataReader}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, 
PartitionOffset}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * The bottom-most RDD of a continuous processing read task. Wraps a 
[[ContinuousQueuedDataReader]]
+ * to read from the remote source, and polls that queue for incoming rows.
+ *
+ * Note that continuous processing calls compute() multiple times, and the 
same
+ * [[ContinuousQueuedDataReader]] instance will/must be shared between 
each call for the same split.
+ */
+class ContinuousDataSourceRDD(
+sc: SparkContext,
+sqlContext: SQLContext,
+@transient private val readerFactories: 
Seq[DataReaderFactory[UnsafeRow]])
+  extends RDD[UnsafeRow](sc, Nil) {
+
+  private val dataQueueSize = 
sqlContext.conf.continuousStreamingExecutorQueueSize
+  private val epochPollIntervalMs = 
sqlContext.conf.continuousStreamingExecutorPollIntervalMs
+
+  // When computing the same partition multiple times, we need to use the 
same data reader to
+  // do so for continuity in offsets.
+  @GuardedBy("dataReaders")
+  private val dataReaders: mutable.Map[Partition, 
ContinuousQueuedDataReader] =
+mutable.Map[Partition, ContinuousQueuedDataReader]()
+
+  override protected def getPartitions: Array[Partition] = {
+readerFactories.zipWithIndex.map {
+  case (readerFactory, index) => new DataSourceRDDPartition(index, 
readerFactory)
+}.toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+// If attempt number isn't 0, this is a task retry, which we don't 
support.
+if (context.attemptNumber() != 0) {
+  throw new ContinuousTaskRetryException()
+}
+
+val readerForPartition = dataReaders.synchronized {
+  if (!dataReaders.contains(split)) {
+dataReaders.put(
+  split,
+  new ContinuousQueuedDataReader(split, context, dataQueueSize, 
epochPollIntervalMs))
+  }
+
+  dataReaders(split)
+}
+
+val coordinatorId = 
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)
+val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, 
SparkEnv.get)
+new Iterator[UnsafeRow] {
+  private val POLL_TIMEOUT_MS = 1000
+
+  private var currentEntry: (UnsafeRow, PartitionOffset) = _
+
+  override def hasNext(): Boolean = {
+while (currentEntry == null) {
+  if (context.isInterrupted() || context.isCompleted()) {
+currentEntry = (null, null)
+  }
+  if (readerForPartition.dataReaderFailed.get()) {
+throw new SparkException(
+  "data read failed", 
readerForPartition.dataReaderThread.failureReason)
+  }
+  if (readerForPartition.epochPollFailed.get()) {
+throw new SparkException(
+  "epoch poll failed", 
readerForPartition.epochPollRunna

[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-02 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21222

[SPARK-24161][SS] Enable debug package feature on structured streaming

## What changes were proposed in this pull request?

Currently, debug package has a implicit class "DebugQuery" which matches 
Dataset to provide debug features on Dataset class. It doesn't work with 
structured streaming: it requires query is already started, and the information 
can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" 
had to be placed to StreamingQuery whereas it already exists on Dataset.

This patch adds a new implicit class "DebugStreamQuery" which matches 
StreamingQuery to provide similar debug features on StreamingQuery class.

## How was this patch tested?

Added relevant unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24161

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21222.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21222


commit c1ad1c557e6165455457adb6f148d6d9616548a1
Author: Jungtaek Lim 
Date:   2018-05-03T02:26:48Z

SPARK-24161 Enable debug package feature on structured streaming

* added implicit class which adds debug features for StreamingQuery
* added unit tests for new functionalities




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-05-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
@tdas @jose-torres @jerryshao @arunmahadevan
Kindly ping to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21207: SPARK-24136: Fix MemoryStreamDataReader.next to skip sle...

2018-05-03 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21207
  
@tdas @jerryshao @HyukjinKwon 
Kindly ping to trigger test and review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r186032252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -88,23 +100,62 @@ package object debug {
 }
   }
 
+  /**
+   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
+   *
+   * @param query the streaming query for codegen
+   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
+   */
+  def codegenString(query: StreamingQuery): String = {
+val msg = query match {
+  case w: StreamExecution if w.lastExecution != null =>
--- End diff --

Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...

2018-05-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21222#discussion_r186247474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
@@ -116,6 +168,30 @@ package object debug {
 }
   }
 
+  implicit class DebugStreamQuery(query: StreamingQuery) extends Logging {
+def debug(): Unit = {
+  query match {
+case w: StreamExecution =>
--- End diff --

My bad. Fixed. Also changed the unit tests so that your reported case can 
be covered in test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...

2018-05-13 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21222
  
Kindly ping. I guess debugging last batch might not be attractive that 
much, but printing codegen would be helpful to someone who want to investigate 
or debug in detail.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188636306
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue}
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+
+/**
+ * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or 
an epoch marker.
+ */
+private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable
+private[shuffle] case class ReceiverRow(row: UnsafeRow) extends 
UnsafeRowReceiverMessage
+private[shuffle] case class ReceiverEpochMarker() extends 
UnsafeRowReceiverMessage
+
+/**
+ * RPC endpoint for receiving rows into a continuous processing shuffle 
task.
+ */
+private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv)
+extends ThreadSafeRpcEndpoint with Logging {
+  private val queue = new 
ArrayBlockingQueue[UnsafeRowReceiverMessage](1024)
--- End diff --

I guess we can handle 2 as TODO if we would like to focus on proposed patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188628202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
--- End diff --

If my understanding is right, bottom will be the RDD which will be just 
injected before shuffling, so that would be neither reader nor writer.

`first` and `last` would be good alternative for me if bottom looks like 
ambiguous. 

As @arunmahadevan stated, comment looks like incomplete.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188638980
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{DataType, IntegerType}
+
+class ContinuousShuffleReadSuite extends StreamTest {
+
+  private def unsafeRow(value: Int) = {
+UnsafeProjection.create(Array(IntegerType : DataType))(
+  new GenericInternalRow(Array(value: Any)))
+  }
+
+  var ctx: TaskContextImpl = _
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+ctx = TaskContext.empty()
+TaskContext.setTaskContext(ctx)
+  }
+
+  override def afterEach(): Unit = {
+ctx.markTaskCompleted(None)
+ctx = null
+super.afterEach()
+  }
+
+  test("receiver stopped with row last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("receiver stopped with marker last") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+ctx.markTaskCompleted(None)
+val receiver = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver
+eventually(timeout(streamingTimeout)) {
+  assert(receiver.stopped.get())
+}
+  }
+
+  test("one epoch") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val iter = rdd.compute(rdd.partitions(0), ctx)
+assert(iter.next().getInt(0) == 111)
+assert(iter.next().getInt(0) == 222)
+assert(iter.next().getInt(0) == 333)
+assert(!iter.hasNext)
+  }
+
+  test("multiple epochs") {
+val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(111)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(222)))
+endpoint.askSync[Unit](ReceiverRow(unsafeRow(333)))
+endpoint.askSync[Unit](ReceiverEpochMarker())
+
+val firstEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(firstEpoch.next().getInt(0) == 111)
+assert(!firstEpoch.hasNext)
+
+val secondEpoch = rdd.compute(rdd.partitions(0), ctx)
+assert(secondEpoch.next().getInt(0) == 222)
+assert(secondEpoch.next().getInt(0) == 333)
+assert(!secondEpoch.hasNext)
+  }
+
+  test("empty epochs

[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...

2018-05-16 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21337#discussion_r188632188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous.shuffle
+
+import java.util.UUID
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.NextIterator
+
+case class ContinuousShuffleReadPartition(index: Int) extends Partition {
+  // Initialized only on the executor, and only once even as we call 
compute() multiple times.
+  lazy val (receiver, endpoint) = {
+val env = SparkEnv.get.rpcEnv
+val receiver = new UnsafeRowReceiver(env)
+val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver)
+TaskContext.get().addTaskCompletionListener { ctx =>
+  env.stop(endpoint)
+}
+(receiver, endpoint)
+  }
+}
+
+/**
+ * RDD at the bottom of each continuous processing shuffle task, reading 
from the
+ */
+class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int)
+extends RDD[UnsafeRow](sc, Nil) {
+
+  override protected def getPartitions: Array[Partition] = {
+(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[UnsafeRow] = {
+val receiver = 
split.asInstanceOf[ContinuousShuffleReadPartition].receiver
+
+new NextIterator[UnsafeRow] {
+  override def getNext(): UnsafeRow = receiver.poll() match {
+case ReceiverRow(r) => r
+case ReceiverEpochMarker() =>
--- End diff --

Does this ensure at-least-once? Then we could start from this, and improve 
it from another PR as @jose-torres stated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStorePr...

2018-05-17 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21357

[SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider to remove 
duplicate…

…d logic between operations on delta file and snapshot file

## What changes were proposed in this pull request?

This patch refactors HDFSBackedStateStoreProvider to extract duplicated 
logic between operations on delta file and snapshot file, as well as 
documenting the structure of state file.

## How was this patch tested?

Existing unit tests: StateStoreSuite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24311

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21357


commit 8ad2a3f8112662a865ee1dbaf7c5269197c3ee4f
Author: Jungtaek Lim 
Date:   2018-05-17T21:17:30Z

SPARK-24311 Refactor HDFSBackedStateStoreProvider to remove duplicated 
logic between operations on delta file and snapshot file

* also removed unused import statements




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...

2018-05-17 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21357
  
cc. @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21388: [SPARK-24336][SQL] Support 'pass through' transfo...

2018-05-21 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21388

[SPARK-24336][SQL] Support 'pass through' transformation in BasicOperators

## What changes were proposed in this pull request?

Enable 'pass through' transformation in BasicOperators via reflection, so 
that every pairs of transformation which only requires converting LogicalPlan 
to SparkPlan via calling `planLater()` can be  transformed automatically. It 
just needs to add the pair of transformation in map.

## How was this patch tested?

Unit tests on existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24336

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21388.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21388


commit 4c2f700a5d7944c13681581df7379e9653c5d588
Author: Jungtaek Lim 
Date:   2018-05-21T23:31:26Z

SPARK-24336 Support 'pass through' transformation in BasicOperators




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
Thanks @HyukjinKwon for reviewing. Addressed review comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21388
  
@hvanhovell 
I also think someone might not want to have reflection magic (I was the one 
but realized I should do it), so I'm happy to close the PR when others voice 
same opinion on this too.

For me, reflection looks like only way to achieve `Can we automate these 
'pass through' operations?`, so if we decide to reject the approach, we might 
be better to either remove the line, or add description on restriction(s) 
instead, unless we have another immediate idea to achieve it without reflection.

Btw, I'd be very happy if you are happy to spend some time to explain which 
points make you being concerned about reflection in planner. Maybe adding the 
description explicitly would avoid the similar trial on contributors and save 
our time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190129892
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
+  private val completion = new 
ExecutorCompletionService[UnsafeRowReceiverMessage](executor)
+
+  private def completionTask(writerId: Int) = new 
Callable[UnsafeRowReceiverMessage] {
+override def call(): UnsafeRowReceiverMessage = 
queues(writerId).take()
   }
 
-  override def close(): Unit = {}
+  // Initialize by submitting tasks to read the first row from each 
writer.
+  (0 until numShuffleWriters).foreach(writerId => 
completion.submit(completionTask(writerId)))
+
+  /**
+   * In each call to getNext(), we pull the next row available in the 
completion queue, and then
+   * submit another task to read the next row from the writer which 
returned it.
+   *
+   * When a writer sends an epoch marker, we note that it's finished 
and don't submit another
+   * task for it in this epoch. The iterator is over once all writers 
have sent an epoch marker.
+   */
+  override def getNext(): UnsafeRow = {
+var nextRow: UnsafeRow = null
+while (nextRow == null) {
+  nextRow = completion.poll(checkpointIntervalMs, 
TimeUnit.MILLISECONDS) match {
+case null =>
+  // Try again if the poll didn't wait long enough to get a 
real result.
+  // But we should be getting at least an epoch marker every 
checkpoint interval.
+  logWarning(
+s"Completion service failed to make progress after 
$checkpointIntervalMs ms")
+  null
+
+// The completion service guarantees this future will be 
available immediately.
+case future => future.get() match {
+  case ReceiverRow(writerId, r) =>
+// Start reading the next element in the queue we just 
took from.
+completion.submit(completionTask(writerId))
+r
+  // TODO use writerId
--- End diff --

It looks like to be not needed as of now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190125731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

As I commented earlier in design doc that I was in favor of single queue, 
because I thought it minimizes the thread count which may avoid unnecessary 
contention (as well as code complexity in this case), and also defines the 
condition of backpressure fairly simple (if RPC requests can block infinitely 
unless queue has room to write). 

But as I read some articles regarding `multiple writers, single reader on 
single queue` vs `single writer, single reader on multiple queues per writer`, 
unless we introduce highly-optimized queue like Disruptor, second approach 
looks like perform better. 

So the approach looks great to me for now, and at least we could consider 
replacing this with adopting queue library when we encounter the bad situation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190120836
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
--- End diff --

The map will always contain `(writerId, true)` which value is not needed at 
all, and we are only concerned about the writerId which range is 0 until 
numShuffleWriters, so it might be better to consider alternative as well.

Looks like this could be also a Set with pre-initialized to 0 until 
numShuffleWriters, and we can remove the element when we receive mark. If the 
element is still in a set, this represents we didn't receive mark from such 
writer yet.

In similar approach, it can be pre-initialized Array of Boolean with value 
as true/false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...

2018-05-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21385#discussion_r190131693
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala
 ---
@@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver(
 
   override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
 case r: UnsafeRowReceiverMessage =>
-  queue.put(r)
+  queues(r.writerId).put(r)
   context.reply(())
   }
 
   override def read(): Iterator[UnsafeRow] = {
 new NextIterator[UnsafeRow] {
-  override def getNext(): UnsafeRow = queue.take() match {
-case ReceiverRow(r) => r
-case ReceiverEpochMarker() =>
-  finished = true
-  null
+  // An array of flags for whether each writer ID has gotten an epoch 
marker.
+  private val writerEpochMarkersReceived =
+mutable.Map.empty[Int, Boolean].withDefaultValue(false)
+
+  private val executor = 
Executors.newFixedThreadPool(numShuffleWriters)
--- End diff --

And I'm also now seeing this approach as alternative to deal with alignment 
(not buffer rows explicitly but just don't read after epoch comes in). Nice 
approach.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...

2018-05-28 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21445
  
@LiangchangZ 
Looks like the patch is needed only with #21353 #21332 #21293 as of now, 
right? If then please state the condition in JIRA issue description as well as 
PR's description so that we don't get confused.

There's a case that reader and writer are composed together in a task 
(current state of continuous processing), and then after this patch it will be 
two places which increase epoch for the same thread. Please note that I'm 
commenting on top of current implementation, not considering #21353 #21332 
#21293.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21445
  
@LiangchangZ 

> In the real CP situation, reader and writer may be always in different 
tasks, right?

Continuous mode already supports some valid use cases, and putting all in 
one task would be fastest in such use cases though tasks can be parallelized by 
partition. Unless we have valid reason to separate reader and writer even in 
non-shuffle query, it would be better to keep it as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191605388
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

Just curious: is there a reason to rearrange functions, this and below 
twos? Looks like they're same except changing this function to `implicit`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629272
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest {
 val thirdEpoch = rdd.compute(rdd.partitions(0), 
ctx).map(_.getUTF8String(0).toString).toSet
 assert(thirdEpoch == Set("writer1-row1", "writer2-row0"))
   }
+
+  test("one epoch") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+  }
+
+  test("multiple epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator(1, 2, 3))
+writer.write(Iterator(4, 5, 6))
+
+assert(readEpoch(reader) == Seq(1, 2, 3))
+assert(readEpoch(reader) == Seq(4, 5, 6))
+  }
+
+  test("empty epochs") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+writer.write(Iterator())
+writer.write(Iterator(1, 2))
+writer.write(Iterator())
+writer.write(Iterator())
+writer.write(Iterator(3, 4))
+writer.write(Iterator())
+
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(1, 2))
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq())
+assert(readEpoch(reader) == Seq(3, 4))
+assert(readEpoch(reader) == Seq())
+  }
+
+  test("blocks waiting for writer") {
+val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions 
= 1)
+val writer = new RPCContinuousShuffleWriter(
+  0, new HashPartitioner(1), Array(readRDDEndpoint(reader)))
+
+val readerEpoch = reader.compute(reader.partitions(0), ctx)
+
+val readRowThread = new Thread {
+  override def run(): Unit = {
+assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1))
+  }
+}
+readRowThread.start()
+
+eventually(timeout(streamingTimeout)) {
+  assert(readRowThread.getState == Thread.State.TIMED_WAITING)
+}
+
+// Once we write the epoch the thread should stop waiting and succeed.
+writer.write(Iterator(1))
+readRowThread.join()
+  }
+
+  test("multiple writer partitions") {
--- End diff --

Would we want to have another test which covers out-of-order epoch between 
writers (if that's valid case for us), or rely on the test in 
ContinuousShuffleReadRDD?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...

2018-05-29 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21428#discussion_r191629554
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
@@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest {
 super.afterEach()
   }
 
-  test("receiver stopped with row last") {
-val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
-val endpoint = 
rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
-send(
-  endpoint,
-  ReceiverEpochMarker(0),
-  ReceiverRow(0, unsafeRow(111))
-)
+  private implicit def unsafeRow(value: Int) = {
--- End diff --

And where it leverages the `implicit` attribute of this method? I'm not 
sure it is really needed, but I'm review on Github page so I might be missing 
here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total size of states in ...

2018-05-31 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21469

[SPARK-24441][SS] Expose total size of states in HDFSBackedStateStore…

…Provider

## What changes were proposed in this pull request?

This patch exposes the estimation of size of cache (loadedMaps) in 
HDFSBackedStateStoreProvider as a custom metric of StateStore. While it refers 
loadedMaps directly, there would be only one StateStoreWriter which refers a 
StateStoreProvider, so the value is not exposed as well as being aggregated 
multiple times. Current state metrics are safe to aggregate for the same reason.

## How was this patch tested?

Tested manually. Below is the snapshot of UI page which is reflected by the 
patch: 

https://user-images.githubusercontent.com/1317309/40788976-4ad93d8c-652c-11e8-88f1-337be5162588.png";>


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24441

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21469.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21469


commit dc11338650842a246a4bce9280d130607ceca281
Author: Jungtaek Lim 
Date:   2018-05-31T14:38:00Z

[SPARK-24441][SS] Expose total size of states in 
HDFSBackedStateStoreProvider

* expose estimation of size of cache (loadMaps) in 
HDFSBackedStateStoreProvider
  * as a custom metric of StateStore




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-05-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
cc. @tdas @jose-torres @jerryshao @HyukjinKwon @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-05-31 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Thanks @HyukjinKwon for reviewing. Addressed PR title as well as fixing nit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...

2018-06-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@jose-torres 
Ah yes I forgot that shallow copy has been occurring, so while new map 
should hold necessary size of map entries but row object will be shared across 
versions. Thanks for pointing it out. Will update the description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@arunmahadevan 
I didn't add the metric to StateOperatorProgress cause this behavior is 
specific to HDFSBackedStateStoreProvider (though this is only one 
implementation available in Apache Spark) so not sure this metric can be 
treated as a general one. (@tdas what do you think about this?)

Btw, the cache is going to clean up when maintenance operation is in 
progress, so there could be more than 100 versions in map. Not sure why it 
shows 150x, but I couldn't find missing spot on the patch. Maybe the issue is 
from SizeEstimator.estimate()?

One thing we need to check is how SizeEstimator.estimate() calculate the 
memory usage when Unsafe row objects are shared across versions. If 
SizeEstimator adds the size of object whenever it is referenced, it will report 
much higher memory usage than actual.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-04 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21477
  
Thanks @HyukjinKwon for cc.ing me. I didn't cover the python part on 
structured streaming so would take some time to cover and going through the 
code. Hoping I can participate reviewing in time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Looks like the size is added only once for same identity on 
SizeEstimator.estimate(), so SizeEstimator.estimate() is working correctly in 
this case. There might be other valid cases, but not sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21497

[SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with 
netcat again

## What changes were proposed in this pull request?

TextSocketMicroBatchReader was no longer be compatible with netcat due to 
launching temporary reader for reading schema, and closing reader, and 
re-opening reader. While reliable socket server should be able to handle this 
without any issue, nc command normally can't handle multiple connections and 
simply exits when closing temporary reader.

This patch fixes TextSocketMicroBatchReader to be compatible with netcat 
again, via deferring opening socket to the first call of planInputPartitions() 
instead of constructor.

## How was this patch tested?

Added unit test which fails on current and succeeds with the patch. And 
also manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24466

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21497.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21497


commit 7b875279742fa295ab513cf8a489830237953d0c
Author: Jungtaek Lim 
Date:   2018-06-05T07:57:42Z

SPARK-24466 Fix TextSocketMicroBatchReader to be compatible with netcat 
again




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
Failing tests were below:
* org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is 
a sbt.testing.NestedSuiteSelector)
* org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is 
a sbt.testing.NestedSuiteSelector)

Test failures are not relevant to the patch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
Also added custom metric for the count of versions stored in loadedMaps.

This is a new screenshot:
https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png";>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
cc. @tdas @jose-torres @jerryshao @arunmahadevan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
@arunmahadevan 
Yes, before the patch Spark connects to socket server twice: one for 
getting schema, and another one for reading data.

And `-k` flag is only supported for specific distribution, and that's why I 
had to set breakpoint and started nc again after temp reader is stopped.

For example, in my local dev. (macOS 10.12.6), netcat doesn't support -k 
flag.

```
netcat (The GNU Netcat) 0.7.1
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
I agree that current cache approach may consume excessive memory 
unnecessarily, and that's also same to my finding in #21469. 

The issue is not that simple however, because in micro-batch mode, each 
batch should read previous version of state, otherwise it should read from file 
system, in worst case seeking and reading multiple files in remote file system. 
So previous version of state is encouraged to be available in memory.

There're three cases here (please add if I'm missing here): 1. fail before 
commit 2. committed but batch failed afterwards 3. committed and batch 
succeeds. It might be better to think about all the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193277616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -35,10 +34,11 @@ import 
org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, 
MicroBatchReadSupport}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType, 
TimestampType}
 
+
--- End diff --

Thanks for letting me know. Addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289099
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
+file.write("%s\n" % str(event))
+file.close()
+
+def stop_all(self):
+for q in self.spark._wrapped.streams.active:
+q.stop()
+
+def __getstate__(self):
+return (self.open_events_dir, self.process_events_dir, 
self.close_events_dir)
+
+def __setstate__(self, state):
+self.open_events_dir, self.process_events_dir, 
self.close_events_dir = state
+
+def test_streaming_foreach_with_simple_function(self):
+tester = self.ForeachWriterTester(self.spark)
+
+def foreach_func(row):
+tester.write_process_event(row)
+
+tester.run_streaming_query_on_writer(foreach_func, 2)
+self.assertEqual(len(tester.process_events()), 2)
+
+def test_streaming_foreach_with_basic_open_process_close(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+return True
+
+def process(self, row):
+tester.write_process_event(row)
+
+def close(self, error):
+tester.write_close_event(error)
+
+tester.run_streaming_query_on_writer(ForeachWriter(), 2)
+
+open_events = tester.open_events()
+self.assertEqual(len(open_events), 2)
+self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1})
+
+self.assertEqual(len(tester.process_events()), 2)
+
+close_events = tester.close_events()
+self.assertEqual(len(close_events), 2)
+self.assertSetEqual(set([e['error'] for e in close_events]), 
{'None'})
+
+def test_streaming_foreach_with_open_returning_false(self):
+tester = self.ForeachWriterTester(self.spark)
+
+class ForeachWriter:
+def open(self, partitionId, epochId):
+tester.write_open_event(partitionId, epochId)
+

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284293
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
--- End diff --

nit: deserialized` `copy (space)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193289567
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

Looks like doc is duplicated between `foreach()` and `ForeachWriter`. I'm 
not sure how we can leave some reference on Python doc instead of duplicating 
content, but even Python doc doesn't support some kind of reference, some part 
of content seems to be OK to be placed to either place, not both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193291809
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.File
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.api.python._
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{NextIterator, Utils}
+
+class PythonForeachWriter(func: PythonFunction, schema: StructType)
+  extends ForeachWriter[UnsafeRow] {
+
+  private lazy val context = TaskContext.get()
+  private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer(
+context.taskMemoryManager, new 
File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length)
+  private lazy val inputRowIterator = buffer.iterator
+
+  private lazy val inputByteIterator = {
+EvaluatePython.registerPicklers()
+val objIterator = inputRowIterator.map { row => 
EvaluatePython.toJava(row, schema) }
+new SerDeUtil.AutoBatchedPickler(objIterator)
+  }
+
+  private lazy val pythonRunner = {
+val conf = SparkEnv.get.conf
+val bufferSize = conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true)
+PythonRunner(func, bufferSize, reuseWorker)
+  }
+
+  private lazy val outputIterator =
+pythonRunner.compute(inputByteIterator, context.partitionId(), context)
+
+  override def open(partitionId: Long, version: Long): Boolean = {
+outputIterator  // initialize everything
+TaskContext.get.addTaskCompletionListener { _ => buffer.close() }
+true
+  }
+
+  override def process(value: UnsafeRow): Unit = {
+buffer.add(value)
+  }
+
+  override def close(errorOrNull: Throwable): Unit = {
+buffer.allRowsAdded()
+if (outputIterator.hasNext) outputIterator.next() // to throw python 
exception if there was one
+  }
+}
+
+object PythonForeachWriter {
+
+  /**
+   * A buffer that is designed for the sole purpose of buffering 
UnsafeRows in PythonForeahWriter.
--- End diff --

nit: PythonForeachWriter


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286066
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193286932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self):
 finally:
 q.stop()
 shutil.rmtree(tmpPath)
+'''
 
+class ForeachWriterTester:
+
+def __init__(self, spark):
+self.spark = spark
+self.input_dir = tempfile.mkdtemp()
+self.open_events_dir = tempfile.mkdtemp()
+self.process_events_dir = tempfile.mkdtemp()
+self.close_events_dir = tempfile.mkdtemp()
+
+def write_open_event(self, partitionId, epochId):
+self._write_event(
+self.open_events_dir,
+{'partition': partitionId, 'epoch': epochId})
+
+def write_process_event(self, row):
+self._write_event(self.process_events_dir, {'value': 'text'})
+
+def write_close_event(self, error):
+self._write_event(self.close_events_dir, {'error': str(error)})
+
+def write_input_file(self):
+self._write_event(self.input_dir, "text")
+
+def open_events(self):
+return self._read_events(self.open_events_dir, 'partition INT, 
epoch INT')
+
+def process_events(self):
+return self._read_events(self.process_events_dir, 'value 
STRING')
+
+def close_events(self):
+return self._read_events(self.close_events_dir, 'error STRING')
+
+def run_streaming_query_on_writer(self, writer, num_files):
+try:
+sdf = 
self.spark.readStream.format('text').load(self.input_dir)
+sq = sdf.writeStream.foreach(writer).start()
+for i in range(num_files):
+self.write_input_file()
+sq.processAllAvailable()
+sq.stop()
+finally:
+self.stop_all()
+
+def _read_events(self, dir, json):
+rows = self.spark.read.schema(json).json(dir).collect()
+dicts = [row.asDict() for row in rows]
+return dicts
+
+def _write_event(self, dir, event):
+import random
+file = open(os.path.join(dir, str(random.randint(0, 10))), 
'w')
--- End diff --

We might feel more convenient with `with` statement, and renaming `file` to 
`f` or `fw` or so. Please ignore if there's specific reason not to use `with` 
statement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193284839
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
--- End diff --

> any initialization for writing data (e.g. opening a connection or 
starting a transaction) be done open after the `open(...)` method has been 
called

`be done open` seems a bit odd. If we can polish the sentence it would be 
better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193285667
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-05 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193304316
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala 
---
@@ -20,10 +20,48 @@ package org.apache.spark.sql
 import org.apache.spark.annotation.InterfaceStability
 
 /**
- * A class to consume data generated by a `StreamingQuery`. Typically this 
is used to send the
- * generated data to external systems. Each partition will use a new 
deserialized instance, so you
- * usually should do all the initialization (e.g. opening a connection or 
initiating a transaction)
- * in the `open` method.
+ * The abstract class for writing custom logic to process data generated 
by a query.
+ * This is often used to write the output of a streaming query to 
arbitrary storage systems.
--- End diff --

Ah yes my bad. I confused this as python.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193372564
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("verify ServerThread only accepts the first connection") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
--- End diff --

Yeah actually I blindly copied the code line in the file. Agreed it would 
be better to use the key.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21497#discussion_r193374662
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
 ---
@@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("verify ServerThread only accepts the first connection") {
+serverThread = new ServerThread()
+serverThread.start()
+
+withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> 
"false") {
--- End diff --

Thanks for guiding, addressed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@arunmahadevan 
Added custom metrics in state store to streaming query status as well. You 
can see `providerLoadedMapSize` is added to `stateOperators.customMetrics` in 
below output.

I have to exclude `providerLoadedMapCountOfVersions` from the list, since 
average metric is implemented a bit tricky and doesn't look like easy to 
aggregate for streaming query status. 
We may want to reimplement SQLMetric and subclasses to make sure everything 
works correctly without any tricky approach, but that doesn't look like trivial 
to address and I think this is out of scope on this PR.

```
18/06/06 22:51:23 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "7564a0b7-e3b2-4d53-b246-b774ab04e586",
  "runId" : "8dd34784-080c-4f86-afaf-ac089902252d",
  "name" : null,
  "timestamp" : "2018-06-06T13:51:15.467Z",
  "batchId" : 4,
  "numInputRows" : 547,
  "inputRowsPerSecond" : 67.15776550030694,
  "processedRowsPerSecond" : 65.94333936106088,
  "durationMs" : {
"addBatch" : 7944,
"getBatch" : 1,
"getEndOffset" : 0,
"queryPlanning" : 61,
"setOffsetRange" : 5,
"triggerExecution" : 8295,
"walCommit" : 158
  },
  "eventTime" : {
"avg" : "2018-06-06T13:51:10.313Z",
"max" : "2018-06-06T13:51:14.250Z",
"min" : "2018-06-06T13:51:07.098Z",
"watermark" : "2018-06-06T13:50:36.676Z"
  },
  "stateOperators" : [ {
"numRowsTotal" : 20,
"numRowsUpdated" : 16,
"memoryUsedBytes" : 26679,
"customMetrics" : {
  "providerLoadedMapSize" : 181911
}
  } ],
  "sources" : [ {
"description" : "KafkaV2[Subscribe[apachelogs-v2]]",
"startOffset" : {
  "apachelogs-v2" : {
"2" : 489056,
"4" : 489053,
"1" : 489055,
"3" : 489051,
"0" : 489053
  }
},
"endOffset" : {
  "apachelogs-v2" : {
"2" : 489056,
"4" : 489053,
"1" : 489055,
"3" : 489051,
"0" : 489053
  }
},
"numInputRows" : 547,
"inputRowsPerSecond" : 67.15776550030694,
"processedRowsPerSecond" : 65.94333936106088
  } ],
  "sink" : {
"description" : 
"org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@60999714"
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21469#discussion_r193622940
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 ---
@@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest 
with BeforeAndAfter {
   test("event ordering") {
 val listener = new EventCollector
 withListenerAdded(listener) {
-  for (i <- 1 to 100) {
+  for (i <- 1 to 50) {
--- End diff --

After the patch this test starts failing: it just means there's more time 
needed to run this loop 100 times, and doesn't mean the logic is broken. 
Decreasing number works for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@TomaszGaweda @aalobaidi 
Please correct me if I'm missing here.

From every start of batch, state store loads previous version of state so 
that it can be read and written. If we unload all the version "after 
committing" the cache will no longer contain previous version of state and it 
will try to load the state via reading files, adding huge latency on starting 
batch. That's why I stated about three cases before to avoid loading state from 
files when starting a new batch.

Please apply #21469 manually and see how much HDFSBackedStateStoreProvider 
consumes memory due to storing multiple versions (it will show the state size 
on the latest version as well as overall state size in cache). Please also 
observe and provide numbers of latency to show how much it is and how much it 
will be after the patch. We always have to ask ourselves that we are addressing 
the issue correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-06 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
Retaining versions of state is also relevant to do snapshotting the last 
version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version 
doesn't exist in loadedMaps. So we may want to check whether this option also 
works with current approach of snapshotting.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21477#discussion_r193740695
  
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
 self._jwrite = self._jwrite.trigger(jTrigger)
 return self
 
+def foreach(self, f):
+"""
+Sets the output of the streaming query to be processed using the 
provided writer ``f``.
+This is often used to write the output of a streaming query to 
arbitrary storage systems.
+The processing logic can be specified in two ways.
+
+#. A **function** that takes a row as input.
+This is a simple way to express your processing logic. Note 
that this does
+not allow you to deduplicate generated data when failures 
cause reprocessing of
+some input data. That would require you to specify the 
processing logic in the next
+way.
+
+#. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
+The object can have the following methods.
+
+* ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
+(for example, open a connection, start a transaction, 
etc). Additionally, you can
+use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
+(discussed later).
+
+* ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
+
+* ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
+close connection, commit transaction, etc.) after all rows 
have been processed.
+
+The object will be used by Spark in the following way.
+
+* A single copy of this object is responsible of all the data 
generated by a
+single task in a query. In other words, one instance is 
responsible for
+processing one partition of the data generated in a 
distributed manner.
+
+* This object must be serializable because each task will get 
a fresh
+serialized-deserializedcopy of the provided object. Hence, 
it is strongly
+recommended that any initialization for writing data (e.g. 
opening a
+connection or starting a transaction) be done open after 
the `open(...)`
+method has been called, which signifies that the task is 
ready to generate data.
+
+* The lifecycle of the methods are as follows.
+
+For each partition with ``partition_id``:
+
+... For each batch/epoch of streaming data with 
``epoch_id``:
+
+... Method ``open(partitionId, epochId)`` is called.
+
+... If ``open(...)`` returns true, for each row in the 
partition and
+batch/epoch, method ``process(row)`` is called.
+
+... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
+processing rows.
+
+Important points to note:
+
+* The `partitionId` and `epochId` can be used to deduplicate 
generated data when
+failures cause reprocessing of some input data. This 
depends on the execution
+mode of the query. If the streaming query is being 
executed in the micro-batch
+mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
+is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
+to deduplicate and/or transactionally commit data and 
achieve exactly-once
+guarantees. However, if the streaming query is being 
executed in the continuous
+mode, then this guarantee does not hold and therefore 
should not be used for
+deduplication.
+
+* The ``close()`` method (if exists) is will be called if 
`open()` method exists and
+returns successfully (irrespective of the return value), 
except if the Python
+crashes in the middle.
+
+.. note:: Evolving.
+
+>>> # Print every row using a function
+>>> writer = sdf.writeStream.foreach(lambda x: print(x))
+>>> # Print every row using a object with process() method
+>>> class RowPrinter:
+... def open(self, partition_id, epoch_id):
+... print("Opened %d, %d" % (

[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...

2018-06-07 Thread HeartSaVioR
GitHub user HeartSaVioR opened a pull request:

https://github.com/apache/spark/pull/21506

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider

## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which 
communicate with file system (mostly remote HDFS in production) in 
HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HeartSaVioR/spark SPARK-24485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21506.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21506


commit d84f98fc978262f4165f78b3b223b8bb3151f735
Author: Jungtaek Lim 
Date:   2018-06-07T14:14:46Z

[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in 
HDFSBackedStateStoreProvider




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
There're plenty of other debug messages which might hide the log messages 
added from this patch. Would we want to log them with INFO instead of DEBUG?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
You can also merge #21506 (maybe with changing log level or modify the 
patch to set message to INFO level) and see latencies on loading state, 
snapshotting, cleaning up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
One thing you may want to be aware is that in point of executor's view, 
executor must load at least 1 version of state in memory regardless of caching 
versions. I guess you may get better result if you unload entire cache but 
leaving the last version you just committed. Cache miss will occur for one of 
three cases `2. committed but batch failed afterwards` but it will happen 
rarely and still better than cache miss from two of three cases (2 and 3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...

2018-06-07 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/21504#discussion_r193945288
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) extends Lo
   @GuardedBy("awaitTerminationLock")
   private var lastTerminatedQuery: StreamingQuery = null
 
+  try {
+sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach 
{ classNames =>
+  Utils.loadExtensions(classOf[StreamingQueryListener], classNames,
+sparkSession.sparkContext.conf).foreach(listener => {
+addListener(listener)
+logInfo(s"Registered listener ${listener.getClass.getName}")
--- End diff --

Either debug or info is fine for me, since it would add just couple of log 
lines only once.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-08 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21504
  
retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21497
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21504
  
Test failures were from kafka.

retest this, please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21500
  
@aalobaidi 
When starting batch, latest version state is being read to start a new 
version of state. If the state should be restored from snapshot as well as 
delta files, it will incur huge latency on restoring.

#21506 logs messages when loading state requires dealing with (remote) 
filesystem. That's why I suggest to merge my patch and run your case again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21469
  
@jose-torres No problem. I expect there would be some inactive moment in 
Spark community during spark summit. Addressed comment regarding renaming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...

2018-06-09 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/spark/pull/21506
  
cc. @tdas @jose-torres @jerryshao @arunmahadevan @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >