[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...

2018-11-29 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22952
  
> @zsxwing Btw, how do you think about addressing background move/deletion 
(I had thought and

Yeah, this can be done in a separate ticket.

I was playing with `org.apache.hadoop.fs.GlobFilter` to see how to detect 
the overlap. But one major issue is before getting the target path, we don't 
know whether a path will match [the glob 
pattern](https://self-learning-java-tutorial.blogspot.com/2016/01/hadoop-java-globbing.html)
 or not.

The worst case, we can check the overlap when parsing the options for a 
normal path. For glob path, we can use `GlobFilter/GlobPattern` to check before 
doing the rename, in which case we can just use the target path.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319928
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237314690
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

NOTE 3: Both delete and move actions are best effort. Failing to delete or 
move files will not fail the streaming query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
--- End diff --

`val curPath = new Path(new URI(entry.path))` to make it escape/unescape 
path properly. `entry.path` was created from `Path.toUri.toString`. Could you 
also add a unit test to test special paths such as `/a/b/a b%.txt`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237319176
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
+  }
+}
+
+def remove(entry: FileEntry): Unit = {
+  val curPath = new Path(entry.path)
+  try {
+logDebug(s"Removing completed file $curPath")
+fs.delete(curPath, false)
+  } catch {
+case NonFatal(e) =>
+  // Log to error but swallow exception to avoid process being 
stopped
+  logWarning(s"Fail to remove $curPath / skip removing file.", e)
+  }
+}
+
+val logOffset = FileStreamSourceOffset(end).logOffset
+metadataLog.get(logOffset) match {
--- End diff --

you can use `val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)` to use the underlying cache in 
FileStreamSourceLog.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237200636
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 ---
@@ -74,6 +76,39 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
*/
   val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
 
+  /**
+   * The archive directory to move completed files. The option will be 
only effective when
+   * "cleanSource" is set to "archive".
+   *
+   * Note that the completed file will be moved to this archive directory 
with respecting to
+   * its own path.
+   *
+   * For example, if the path of source file is "/a/b/dataset.txt", and 
the path of archive
+   * directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt".
+   */
+  val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")
+
+  /**
+   * Defines how to clean up completed files. Available options are 
"archive", "delete", "no_op".
+   */
+  val cleanSource: CleanSourceMode.Value = {
+val modeStrOption = parameters.getOrElse("cleanSource", 
CleanSourceMode.NO_OP.toString)
--- End diff --

nit: could you create a method to `CleanSourceMode` to convert a string to 
`CleanSourceMode.Value`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237315173
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
--- End diff --

Could you do this check only when CleanSourceMode is `ARCHIVE`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237315718
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -257,16 +289,65 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in 
the future.
*/
   override def commit(end: Offset): Unit = {
-// No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
-// and the value of the maxFileAge parameter.
+def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
+  val curPath = new Path(entry.path)
+  val curPathUri = curPath.toUri
+
+  val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
+  try {
+logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
+if (!fs.exists(newPath.getParent)) {
+  fs.mkdirs(newPath.getParent)
+}
+
+logDebug(s"Archiving completed file $curPath to $newPath")
+fs.rename(curPath, newPath)
--- End diff --

It's better to also check the return value of `rename`. A user may reuse a 
source archive dir and cause path conflicts. We should also log this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237320515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -100,6 +101,36 @@ class FileStreamSource(
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
+  ensureNoOverlapBetweenSourceAndArchivePath()
+
+  private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = {
+@tailrec
+def removeGlob(path: Path): Path = {
+  if (path.getName.contains("*")) {
+removeGlob(path.getParent)
+  } else {
+path
+  }
+}
+
+sourceOptions.sourceArchiveDir match {
+  case None =>
+  case Some(archiveDir) =>
+val sourceUri = removeGlob(qualifiedBasePath).toUri
+val archiveUri = new Path(archiveDir).toUri
+
+val sourcePath = sourceUri.getPath
+val archivePath = archiveUri.getPath
--- End diff --

we need to use `fs.makeQualified` to turn all user provided paths to 
absolute paths as the user may just pass a relative path.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...

2018-11-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22952#discussion_r237314459
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -530,6 +530,12 @@ Here are the details of all the sources in Spark.
 "s3://a/dataset.txt"
 "s3n://a/b/dataset.txt"
 "s3a://a/b/c/dataset.txt"
+cleanSource: option to clean up completed files after 
processing.
+Available options are "archive", "delete", "no_op". If the option 
is not provided, the default value is "no_op".
+When "archive" is provided, additional option 
sourceArchiveDir must be provided as well. The value of 
"sourceArchiveDir" must be outside of source path, to ensure archived files are 
never included to new source files again.
+Spark will move source files respecting its own path. For example, 
if the path of source file is "/a/b/dataset.txt" and the path of archive 
directory is "/archived/here", file will be moved to 
"/archived/here/a/b/dataset.txt"
+NOTE: Both archiving (via moving) or deleting completed files 
would introduce overhead (slow down) in each micro-batch, so you need to 
understand the cost for each operation in your file system before enabling this 
option. On the other hand, enbling this option would reduce the cost to list 
source files which is considered as a heavy operation.
+NOTE 2: The source path should not be used from multiple queries 
when enabling this option, because source files will be moved or deleted which 
behavior may impact the other queries.
--- End diff --

NOTE 2: The source path should not be used from multiple **sources or** 
queries when enabling this option, because source files will be moved or 
deleted which behavior may impact the other **sources and** queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23109: [SPARK-26069][TESTS][FOLLOWUP]Add another possible error...

2018-11-21 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23109
  
cc @squito


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23109: [SPARK-26069][TESTS][FOLLOWUP]Add another possibl...

2018-11-21 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/23109

[SPARK-26069][TESTS][FOLLOWUP]Add another possible error message

## What changes were proposed in this pull request?

`org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures` is 
still flaky and here is error message:

```
sbt.ForkMain$ForkError: java.lang.AssertionError: Got a non-empty set 
[Failed to send RPC RPC 8249697863992194475 to /172.17.0.2:41177: 
java.io.IOException: Broken pipe]
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
org.apache.spark.network.RpcIntegrationSuite.assertErrorAndClosed(RpcIntegrationSuite.java:389)
at 
org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures(RpcIntegrationSuite.java:347)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at com.novocode.junit.JUnitRunner$1.execute(JUnitRunner.java:132)
at sbt.ForkMain$Run$2.call(ForkMain.java:296)
at sbt.ForkMain$Run$2.call(ForkMain.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

This happened when the second RPC message was being sent but the connection 
was closed at the same time.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-26069-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23109


commit 91193e73c570158697150a35a8977c61d5c3f86b
Author: Shixiong Zhu 
Date:   2018-11-21T19:49:45Z

another possible error message




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23089: [SPARK-26120][TESTS][SS][SPARKR]Fix a streaming query le...

2018-11-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23089
  
cc @felixcheung 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23089: [SPARK-26120][TESTS][SS][SPARKR]Fix a streaming q...

2018-11-19 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/23089

[SPARK-26120][TESTS][SS][SPARKR]Fix a streaming query leak in Structured 
Streaming R tests

## What changes were proposed in this pull request?

Stop the streaming query in `Specify a schema by using a DDL-formatted 
string when reading` to avoid outputting annoying logs.

## How was this patch tested?

Jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-26120

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23089


commit 3353bb1db87cc2f51545b28fcc2f83e6037eb1c3
Author: Shixiong Zhu 
Date:   2018-11-19T19:50:20Z

Fix a streaming query leak in Structured Streaming R tests




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23060: [SPARK-26092][SS]Use CheckpointFileManager to write the ...

2018-11-16 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23060
  
Thanks! Merging to master and 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23034: [SPARK-26035][PYTHON] Break large streaming/tests.py fil...

2018-11-16 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23034
  
LGTM. By the way, @HyukjinKwon I totally understand that this PR needs to 
merge soon to avoid getting conflicts. But could you please at least get 
someone to review and sign off before merging next time? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23060: [SPARK-26092][SS]Use CheckpointFileManager to wri...

2018-11-16 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/23060

[SPARK-26092][SS]Use CheckpointFileManager to write the streaming metadata 
file

## What changes were proposed in this pull request?

Use CheckpointFileManager to write the streaming `metadata` file so that 
the `metadata` file will never be a partial file.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-26092

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23060.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23060


commit 4cb6abe5cb44692075841c446aca8d9cf12b8968
Author: Shixiong Zhu 
Date:   2018-11-16T06:26:43Z

Use CheckpointFileManager to write the streaming metadata file




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....

2018-11-16 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23041
  
Thanks. Merging to master and 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....

2018-11-15 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23041
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegratio...

2018-11-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/23041#discussion_r233734198
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -371,23 +371,29 @@ private void assertErrorsContain(Set errors, 
Set contains) {
 
   private void assertErrorAndClosed(RpcResult result, String 
expectedError) {
 assertTrue("unexpected success: " + result.successMessages, 
result.successMessages.isEmpty());
-// we expect 1 additional error, which contains *either* "closed" or 
"Connection reset"
+// we expect 1 additional error, which should contain one of the 
follow messages:
+// - "closed"
+// - "Connection reset"
+// - "java.nio.channels.ClosedChannelException"
 Set errors = result.errorMessages;
 assertEquals("Expected 2 errors, got " + errors.size() + "errors: " +
 errors, 2, errors.size());
 
 Set containsAndClosed = Sets.newHashSet(expectedError);
 containsAndClosed.add("closed");
 containsAndClosed.add("Connection reset");
+containsAndClosed.add("java.nio.channels.ClosedChannelException");
 
 Pair, Set> r = checkErrorsContain(errors, 
containsAndClosed);
 
-Set errorsNotFound = r.getRight();
-assertEquals(1, errorsNotFound.size());
-String err = errorsNotFound.iterator().next();
-assertTrue(err.equals("closed") || err.equals("Connection reset"));
+assertTrue("Got a non-empty set " + r.getLeft(), 
r.getLeft().isEmpty());
--- End diff --

Moved this check here so that we can see what's the error that causes the 
test failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....

2018-11-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23041
  
cc @squito


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegratio...

2018-11-14 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/23041

[SPARK-26069][TESTS]Fix flaky test: 
RpcIntegrationSuite.sendRpcWithStreamFailures

## What changes were proposed in this pull request?

The test failure is because `assertErrorAndClosed` misses one possible 
error message: `java.nio.channels.ClosedChannelException`. This happens when 
the second `uploadStream` is called after the channel has been closed. This can 
be reproduced by adding `Thread.sleep(1000)` below this line: 
https://github.com/apache/spark/blob/03306a6df39c9fd6cb581401c13c4dfc6bbd632e/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java#L217

This PR fixes the above issue and also improves the test failure messages 
of `assertErrorAndClosed`.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-26069

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23041.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23041


commit 6bebcb5e004ed4b434c550d26ed1a922d13e0446
Author: Shixiong Zhu 
Date:   2018-11-15T07:16:00Z

fix test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...

2018-11-14 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23023
  
Thanks! Merging to master and 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...

2018-11-13 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/23023
  
cc @jose-torres 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in K...

2018-11-13 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/23023

[SPARK-26042][SS][TESTS]Fix a potential hang in 
KafkaContinuousSourceTopicDeletionSuite

## What changes were proposed in this pull request?

As initializing lazy vals shares the same lock, a thread is trying to 
initialize `executedPlan` when `isRDD` is running, this thread will hang 
forever.

This PR just materializes `executedPlan` so that accessing it when `toRdd` 
is running doesn't need to wait for a lock

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-26042

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23023.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23023


commit 33b51136e6292db8236c6d39d662887d18a9534d
Author: Shixiong Zhu 
Date:   2018-11-13T17:42:08Z

Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...

2018-11-12 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22952
  
> Provide additional option: delete (two options - 'rename' / 'delete' - 
are mutually exclusive)
> 
> Actually the actions end users are expected to take are 1. moving to 
archive directory (with compression or not) 2. delete periodically. If 
moving/renaming require non-trivial cost, end users may want to just delete 
files directly without backing up.

+1 for this approach. The file listing cost is huge when the directory has 
a lot of files. I think one of the goals of this feature is reducing the file 
listing cost. Hence either delete the files or move to a different directory 
should be fine. Also could you try to make one simple option for 
`rename/delete`, such as `cleanSource` -> (`none`, `rename` or `delete`)? When 
the user picks up `rename`, they should be able to set the archive directory 
using another option.

In addition, it would be great that we can document that whenever using 
this option, the same directory should not be used by multiple queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22923: [SPARK-25910][CORE] accumulator updates from previous st...

2018-11-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22923
  
We need to always update user accumulators. Right now such task metrics 
just cause some annoying error logs, seems not worth to fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22910: [SPARK-25899][TESTS]Fix flaky CoarseGrainedSchedu...

2018-10-31 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22910

[SPARK-25899][TESTS]Fix flaky CoarseGrainedSchedulerBackendSuite

## What changes were proposed in this pull request?

I saw CoarseGrainedSchedulerBackendSuite failed in my PR and finally 
reproduced the following error on a very busy machine:
```
sbt.ForkMain$ForkError: 
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 400 times over 10.00982864399 
seconds. Last failure message: ArrayBuffer("2", "0", "3") had length 3 instead 
of expected length 4.
```

The logs in this test shows executor 1 was not up when the test failed.
```
18/10/30 11:34:03.563 dispatcher-event-loop-12 INFO 
CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor 
NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43656) with ID 2
18/10/30 11:34:03.593 dispatcher-event-loop-3 INFO 
CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor 
NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43658) with ID 3
18/10/30 11:34:03.629 dispatcher-event-loop-6 INFO 
CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor 
NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43654) with ID 0
18/10/30 11:34:03.885 
pool-1-thread-1-ScalaTest-running-CoarseGrainedSchedulerBackendSuite INFO 
CoarseGrainedSchedulerBackendSuite: 

= FINISHED o.a.s.scheduler.CoarseGrainedSchedulerBackendSuite: 'compute 
max number of concurrent tasks can be launched' =
```
And the following logs in executor 1 shows it was still doing the 
initialization when the timeout happened (at 18/10/30 11:34:03.885).
```
18/10/30 11:34:03.463 netty-rpc-connection-0 INFO TransportClientFactory: 
Successfully created connection to 54b6b6217301/172.17.0.2:33741 after 37 ms (0 
ms spent in bootstraps)
18/10/30 11:34:03.959 main INFO DiskBlockManager: Created local directory 
at 
/home/jenkins/workspace/core/target/tmp/spark-383518bc-53bd-4d9c-885b-d881f03875bf/executor-61c406e4-178f-40a6-ac2c-7314ee6fb142/blockmgr-03fb84a1-eedc-4055-8743-682eb3ac5c67
18/10/30 11:34:03.993 main INFO MemoryStore: MemoryStore started with 
capacity 546.3 MB
```

Hence, I think our current 10 seconds is not enough on a slow Jenkins 
machine. This PR just increases the timeout from 10 seconds to 60 seconds to 
make the test more stable.

## How was this patch tested?

Jenkins

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-flaky-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22910.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22910


commit 0a3095fd5610810004ef6a0d1e02581b78bfdea4
Author: Shixiong Zhu 
Date:   2018-10-30T22:08:26Z

Fix flaky CoarseGrainedSchedulerBackendSuite




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-30 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-29 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
@markhamstra any further comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228409787
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

I created https://issues.apache.org/jira/browse/SPARK-25849 to improve the 
document.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-25 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r228355772
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
   if (job.numFinished == job.numPartitions) {
 markStageAsFinished(resultStage)
 cleanupStateForJobAndIndependentStages(job)
+try {
+  // killAllTaskAttempts will fail if a 
SchedulerBackend does not implement
+  // killTask.
+  logInfo(s"Job ${job.jobId} is finished. Killing 
potential speculative or " +
+s"zombie tasks for this job")
--- End diff --

How about `... is finished. Cancelling ...`? This should be consistent with 
other places.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...

2018-10-25 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22816
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22816: [SPARK-25822][PySpark]Fix a race condition when r...

2018-10-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22816#discussion_r228262084
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
@@ -114,7 +114,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 
 context.addTaskCompletionListener[Unit] { _ =>
   writerThread.shutdownOnTaskCompletion()
-  if (!reuseWorker || !released.get) {
+  if (!reuseWorker || released.compareAndSet(false, true)) {
--- End diff --

Addressed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...

2018-10-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22816
  
cc @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22816: [SPARK-25822][PySpark]Fix a race condition when r...

2018-10-24 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22816

[SPARK-25822][PySpark]Fix a race condition when releasing a Python worker

## What changes were proposed in this pull request?

There is a race condition when releasing a Python worker. If 
`ReaderIterator.handleEndOfDataSection` is not running in the task thread, when 
a task is early terminated (such as `take(N)`), the task completion listener 
may close the worker but "handleEndOfDataSection" can still put the worker into 
the worker pool to reuse.


https://github.com/zsxwing/spark/commit/0e07b483d2e7c68f3b5c3c118d0bf58c501041b7
 is a patch to reproduce this issue.

I also found a user reported this in the mail list: 
http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCAAUq=h+yluepd23nwvq13ms5hostkhx3ao4f4zqv6sgo5zm...@mail.gmail.com%3E

This PR fixes the issue by using `compareAndSet` to make sure we will never 
return a closed worker to the work pool.

## How was this patch tested?

Jenkins.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-socket-closed

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22816


commit a22e38917b4f2893ce0d72febed4df1d3eb9fdd5
Author: Shixiong Zhu 
Date:   2018-10-24T07:51:26Z

Fix a race condition when releasing a Python worker




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-23 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
I agreed that the task reaper is a big change to the story and we should 
reconsider SPARK-17064. Could we move the discussion to SPARK-17064?

By the way, regarding this PR itself, @tgravescs @markhamstra any further 
comments?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
@tgravescs Yeah, looks like that 
https://issues.apache.org/jira/browse/SPARK-24622 is better but it may take 
more time than this one, since this PR is smaller and less risky.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22771#discussion_r227060028
  
--- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala ---
@@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
 }
   }
+
+  test("cancel zombie tasks in a result stage when the job finishes") {
+val conf = new SparkConf()
+  .setMaster("local-cluster[1,2,1024]")
+  .setAppName("test-cluster")
+  .set("spark.ui.enabled", "false")
+  // Disable this so that if a task is running, we can make sure the 
executor will always send
+  // task metrics via heartbeat to driver.
+  .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
+  // Set a short heartbeat interval to send 
SparkListenerExecutorMetricsUpdate fast
+  .set("spark.executor.heartbeatInterval", "1s")
+sc = new SparkContext(conf)
+sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
+@volatile var runningTaskIds: Seq[Long] = null
+val listener = new SparkListener {
+  override def onExecutorMetricsUpdate(
+  executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit 
= {
+if (executorMetricsUpdate.execId != 
SparkContext.DRIVER_IDENTIFIER) {
+  runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
+}
+  }
+}
+sc.addSparkListener(listener)
+sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
+  val context = org.apache.spark.TaskContext.get()
+  if (context.stageAttemptNumber == 0) {
+if (context.partitionId == 0) {
+  // Make the first task in the first stage attempt fail.
+  throw new 
FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
+new java.io.IOException("fake"))
+} else {
+  // Make the second task in the first stage attempt sleep to 
generate a zombie task
+  Thread.sleep(6)
+}
+  } else {
+// Make the second stage attempt successful.
+  }
+  x
+}.collect()
+sc.listenerBus.waitUntilEmpty(1)
+// As executors will send the metrics of running tasks via heartbeat, 
we can use this to check
+// whether there is any running task.
--- End diff --

I prefer this way to make sure the executor did receive the kill command 
and interrupt the tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...

2018-10-22 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22770#discussion_r227056168
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -31,15 +32,15 @@ import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util.{RedirectThread, Utils}
 
 private[spark] class PythonWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
-  extends Logging {
+  extends Logging { self =>
 
   import PythonWorkerFactory._
 
   // Because forking processes from Java is expensive, we prefer to launch 
a single Python daemon,
   // pyspark/daemon.py (by default) and tell it to fork new workers for 
our tasks. This daemon
   // currently only works on UNIX-based systems now because it uses 
signals for child management,
   // so we can also fall back to launching workers, pyspark/worker.py (by 
default) directly.
-  val useDaemon = {
+  private val useDaemon = {
--- End diff --

Fixing these since I'm touching a lot of fields in this file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22770: [SPARK-25771][PYSPARK]Fix improper synchronization in Py...

2018-10-22 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22770
  
Thanks for reviewing this. Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...

2018-10-18 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22771
  
cc @squito @tgravescs @jiangxb1987 @kayousterhout 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...

2018-10-18 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22771

[SPARK-25773][Core]Cancel zombie tasks in a result stage when the job 
finishes

## What changes were proposed in this pull request?

When a job finishes, there may be some zombie tasks still running due to 
stage retry. Since a result stage will never be used by other jobs, running 
these tasks are just wasting the cluster resource. This PR just asks 
TaskScheduler to cancel the running tasks of a result stage when it's already 
finished. Credits go to @srinathshankar who suggested this idea to me.

This PR also fixes two minor issues while I'm touching DAGScheduler:
- Invalid spark.job.interruptOnCancel should not crash DAGScheduler.
- Non fatal errors should not crash DAGScheduler.

## How was this patch tested?

The new unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25773

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22771


commit 581ea53b57cc9fc0e89f2d635422653cfdfcb27f
Author: Shixiong Zhu 
Date:   2018-10-16T22:07:04Z

Cancel zombie tasks in a result stage when the job finishes




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...

2018-10-18 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22770#discussion_r226459546
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -278,7 +289,7 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
 
 override def run() {
   while (true) {
-synchronized {
+self.synchronized {
--- End diff --

this is fix 1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...

2018-10-18 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22770#discussion_r226459609
  
--- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---
@@ -163,7 +172,9 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
   try {
 val socket = serverSocket.accept()
 authHelper.authClient(socket)
-simpleWorkers.put(socket, worker)
+self.synchronized {
--- End diff --

this is fix 2.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...

2018-10-18 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22770

[SPARK-25771][PYSPARK]Fix improper synchronization in PythonWorkerFactory

## What changes were proposed in this pull request?

Fix the following issues in PythonWorkerFactory
- MonitorThread.run uses a wrong lock.
- `createSimpleWorker` misses `synchronized` when updating `simpleWorkers`.

Other changes are just to improve the code style to make the thread-safe 
contract clear.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark pwf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22770


commit 0369de2323640377c0f990bb47ebe112654ca498
Author: Shixiong Zhu 
Date:   2018-10-18T20:39:04Z

fix improper synchronization




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22692: [SPARK-25598][STREAMING][BUILD][test-maven] Remove flume...

2018-10-11 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22692
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22173: [SPARK-24355] Spark external shuffle server impro...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22173#discussion_r223513440
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 ---
@@ -77,17 +82,54 @@
   private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
   private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
 
+  // Separate thread pool for handling ChunkFetchRequest. This helps to 
enable throttling
+  // max number of TransportServer worker threads that are blocked on 
writing response
+  // of ChunkFetchRequest message back to the client via the underlying 
channel.
+  private static EventLoopGroup chunkFetchWorkers;
--- End diff --

Is there any special reason that this must be a global one? I have not yet 
looked the details. But looks like this may cause ChunkFetchIntegrationSuite 
flaky as there is no isolation between tests. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22628: [SPARK-25641] Change the spark.shuffle.server.chunkFetch...

2018-10-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22628
  
Is it supposed to the flaky ChunkFetchIntegrationSuite? 
http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.network.ChunkFetchIntegrationSuite_name=fetchFileChunk


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...

2018-10-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22627
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223490770
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With `foreachBatch`, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
`foreachBatch`, you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.persist()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.unpersist()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using `foreachBatch`, you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223490811
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With `foreachBatch`, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
`foreachBatch`, you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.persist()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.unpersist()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using `foreachBatch`, you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have 

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223491087
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
--- End diff --

nit: `foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223491101
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,214 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream().foreachBatch(
+  new VoidFunction2, Long> {
+public void call(Dataset dataset, Long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epoch_id):
+# Transform and write batchDF
+pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
--- End diff --

nit: `foreachBatchFunction` -> `foreach_batch_function`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-08 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r223481016
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...

2018-10-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22649
  
> and this only becomes ambiguous in 2.12 (long story).

Yeah, I'm curious why it didn't fail in 2.12 before. I know there are 
several Scala features not working with overload. This may be one of them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...

2018-10-08 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22649
  
Thanks for fixing this. I'm just curious why it didn't fail before my 
change from `Long` to `java.lang.Long`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add Pspark-ganglia-lgpl to the sca...

2018-10-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22647
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-05 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22633
  
Thanks. Merging to master and 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22633
  
Looks like `lint-java` doesn't catch any style issues in my PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222813906
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
--- End diff --

`uncache()` -> `unpersist()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815111
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815845
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812874
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815770
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812936
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
--- End diff --

nit: `writeStream()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815072
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818441
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
--- End diff --

this example changes the schema. Right? From `string` to `struct`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222817163
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222814573
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222812757
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
--- End diff --

long -> Long. I noticed the current Java API actually is wrong. Submitted 
https://github.com/apache/spark/pull/22633 to fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818615
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
+
+  - Changes to the user-defined foreach sink (that is, the `ForeachWriter` 
code) is allowed, but the semantics of the change depends on the code.
+
+- *Changes in projection / filter / map-like operations**: Some cases are 
allowed. For example:
+
+  - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to 
`sdf.where(...).selectExpr("a").filter(...)`.
+
+  - Changes in projections with same output schema is allowed: 
`sdf.selectExpr("stringColumn AS json").writeStream` to 
`sdf.select(to_json(...).as("json")).writeStream`.
+
+  - Changes in projections with different output schema are conditionally 
allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` 
is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
+
+- *Changes in stateful operations*: Some operations in streaming queries 
need to maintain
+  state data in order to continuously update the result. Structured 
Streaming automatically checkpoints
+  the state data to fault-tolerant storage (for example, DBFS, AWS S3, 
Azure Blob storage) and restores it after restart.
--- End diff --

remove `DBFS`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222818119
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = 
"complete", checkpointLocation = "pat
 
 
 
+
+## Recovery Semantics after Changes in a Streaming Query
+There are limitations on what changes in a streaming query are allowed 
between restarts from the 
+same checkpoint location. Here are a few kinds of changes that are either 
not allowed, or 
+the effect of the change is not well-defined. For all of them:
+
+- The term *allowed* means you can do the specified change but whether the 
semantics of its effect 
+  is well-defined depends on the query and the change.
+
+- The term *not allowed* means you should not do the specified change as 
the restarted query is likely 
+  to fail with unpredictable errors. `sdf` represents a streaming 
DataFrame/Dataset 
+  generated with sparkSession.readStream.
+  
+**Types of changes**
+
+- *Changes in the number or type (i.e. different source) of input 
sources*: This is not allowed.
+
+- *Changes in the parameters of input sources*: Whether this is allowed 
and whether the semantics 
+  of the change are well-defined depends on the source and the query. Here 
are a few examples.
+
+  - Addition/deletion/modification of rate limits is allowed: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", 
"topic").option("maxOffsetsPerTrigger", ...)`
+
+  - Changes to subscribed topics/files is generally not allowed as the 
results are unpredictable: 
`spark.readStream.format("kafka").option("subscribe", "topic")` to 
`spark.readStream.format("kafka").option("subscribe", "newTopic")`
+
+- *Changes in the type of output sink*: Changes between a few specific 
combinations of sinks 
+  are allowed. This needs to be verified on a case-by-case basis. Here are 
a few examples.
+
+  - File sink to Kafka sink is allowed. Kafka will see only the new data.
+
+  - Kafka sink to file sink is not allowed.
+
+  - Kafka sink changed to foreach, or vice versa is allowed.
+
+- *Changes in the parameters of output sink*: Whether this is allowed and 
whether the semantics of 
+  the change are well-defined depends on the sink and the query. Here are 
a few examples.
+
+  - Changes to output directory of a file sink is not allowed: 
`sdf.writeStream.format("parquet").option("path", "/somePath")` to 
`sdf.writeStream.format("parquet").option("path", "/anotherPath")`
+
+  - Changes to output topic is allowed: 
`sdf.writeStream.format("kafka").option("topic", "someTopic")` to 
`sdf.writeStream.format("kafka").option("path", "anotherTopic")`
--- End diff --

nit: `path` -> `topic`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815808
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815830
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...

2018-10-04 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22627#discussion_r222815089
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
 
 
 
-# Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`

-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
-which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
+# Using Foreach and ForeachBatch
+The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
+logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
+allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
+and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
+
+## ForeachBatch
+`foreachBatch(...)` allows you to specify a function that is executed on 
+the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
+It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
+
+
+
+
+{% highlight scala %}
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
+  // Transform and write batchDF 
+}.start()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+streamingDatasetOfString.writeStream.foreachBatch(
+  new VoidFunction2, long> {
+void call(Dataset dataset, long batchId) {
+  // Transform and write batchDF
+}
+  }
+).start();
+{% endhighlight %}
+
+
+
+
+{% highlight python %}
+def foreachBatchFunction(df, epochId):
+  # Transform and write batchDF
+  pass
+  
+streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
+{% endhighlight %}
+
+
+
+R is not yet supported.
+
+
+
+With foreachBatch, you can do the following.
+
+- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
+  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
+  data writers on the output of each micro-batch.
+- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
+  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
+  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
+  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
+
+streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
+  batchDF.cache()
+  batchDF.write.format(...).save(...)  // location 1
+  batchDF.write.format(...).save(...)  // location 2
+  batchDF.uncache()
+}
+
+- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
+  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
+  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
+
+**Note:**
+- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
+  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
+- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
+  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
+
+
+## Foreach
+If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
+continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
+Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
+Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
+
+
+
+
+In Scala, you have to extend the class `For

[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...

2018-10-04 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22633

[SPARK-25644][SS]Fix java foreachBatch in DataStreamWriter

## What changes were proposed in this pull request?

The java `foreachBatch` API in `DataStreamWriter` should accept 
`java.lang.Long` rather `scala.Long`.

## How was this patch tested?

New java test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark fix-java-foreachbatch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22633


commit aed8c2457f8abff5b50d579dc3a4938cc36c5be1
Author: Shixiong Zhu 
Date:   2018-10-04T20:12:25Z

fix java foreachBatch




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...

2018-10-04 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22633
  
cc @tdas 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22586: [SPARK-25568][Core]Continue to update the remaini...

2018-09-29 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22586#discussion_r221438766
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 assert(sc.parallelize(1 to 10, 2).count() === 10)
   }
 
+  test("misbehaved accumulator should not impact other accumulators") {
--- End diff --

> Also verify the log message?

That's not in the core project.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...

2018-09-28 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22473
  
Thanks! Merging to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22586: [SPARK-25568][Core]Continue to update the remaini...

2018-09-28 Thread zsxwing
GitHub user zsxwing opened a pull request:

https://github.com/apache/spark/pull/22586

[SPARK-25568][Core]Continue to update the remaining accumulators when 
failing to update one accumulator

## What changes were proposed in this pull request?

Since we don't fail a job when `AccumulatorV2.merge` fails, we should try 
to update the remaining accumulators so that they can still report correct 
values.

## How was this patch tested?

The new unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zsxwing/spark SPARK-25568

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22586


commit badb4711caff6e3d10ba9037f71c1ad6515577e8
Author: Shixiong Zhu 
Date:   2018-09-28T20:52:07Z

continue to update the remaining accumulators




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...

2018-09-28 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22473
  
LGTM pending tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...

2018-09-28 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22473
  
Looks like `org.apache.spark.deploy.history.HistoryServerSuite.executor 
list with executor metrics json` and 
`org.apache.spark.util.JsonProtocolSuite.SparkListenerEvent` are broken by the 
changes. Could you also fix them?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-28 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r221313411
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Seria
 require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
   s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling 
encryption.")
 
-val executorTimeoutThreshold = 
getTimeAsSeconds("spark.network.timeout", "120s")
-val executorHeartbeatInterval = 
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
+val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", 
"120s")
--- End diff --

Could you use `getTimeAsSeconds` and manually convert it to `ms`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r221022783
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Seria
 require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
   s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling 
encryption.")
 
-val executorTimeoutThreshold = 
getTimeAsSeconds("spark.network.timeout", "120s")
-val executorHeartbeatInterval = 
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
+val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", 
"120s")
+val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL)
 // If spark.executor.heartbeatInterval bigger than 
spark.network.timeout,
 // it will almost always cause ExecutorLostFailure. See SPARK-22754.
 require(executorTimeoutThreshold > executorHeartbeatInterval, "The 
value of " +
-  s"spark.network.timeout=${executorTimeoutThreshold}s must be no less 
than the value of " +
-  s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
+  s"spark.network.timeout=${executorTimeoutThreshold}ms must be no 
less than the value of " +
--- End diff --

nit: "ms" -> "s" once you address the above comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-27 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r221022651
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Seria
 require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
   s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling 
encryption.")
 
-val executorTimeoutThreshold = 
getTimeAsSeconds("spark.network.timeout", "120s")
-val executorHeartbeatInterval = 
getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
+val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", 
"120s")
--- End diff --

Could you change `getTimeAsMs` back to `getTimeAsSeconds`? There is a 
slight difference when the user doesn't specify the time unit. `getTimeAsMs` 
uses `ms` as default, while `getTimeAsSeconds` uses `seconds`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22553: [SPARK-25541][SQL] CaseInsensitiveMap should be s...

2018-09-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22553#discussion_r220718383
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
 ---
@@ -42,7 +42,11 @@ class CaseInsensitiveMap[T] private (val originalMap: 
Map[String, T]) extends Ma
   override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator
 
   override def -(key: String): Map[String, T] = {
-new 
CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key)))
+new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key)))
+  }
+
+  override def filterKeys(p: (String) => Boolean): Map[String, T] = {
--- End diff --

This method right now doesn't follow the Scaladoc:
```
   *  @return an immutable map consisting only of those key value pairs of 
this map where the key satisfies
   *  the predicate `p`. The resulting map wraps the original map 
without copying any elements.
```
Maybe we should not add it.

This is a Scala issue but they didn't fix in until 2.13 
(https://github.com/scala/bug/issues/4776).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22507: [SPARK-25495][SS]FetchedData.reset should reset all fiel...

2018-09-25 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22507
  
Thanks! Merging to master and 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...

2018-09-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22507#discussion_r220286600
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   )
 }
   }
+
+  test("SPARK-25495: FetchedData.reset should reset all fields") {
+val topic = newTopic()
+val topicPartition = new TopicPartition(topic, 0)
+testUtils.createTopic(topic, partitions = 1)
+
+val ds = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("kafka.isolation.level", "read_committed")
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .load()
+  .select($"value".as[String])
+
+testUtils.withTranscationalProducer { producer =>
+  producer.beginTransaction()
+  (0 to 3).foreach { i =>
+producer.send(new ProducerRecord[String, String](topic, 
i.toString)).get()
+  }
+  producer.commitTransaction()
+}
+testUtils.waitUntilOffsetAppears(topicPartition, 5)
+
+val q = ds.writeStream.foreachBatch { (ds, epochId) =>
+  if (epochId == 0) {
+// Post more messages to Kafka so that the executors will fetch 
messages in the next batch
+// and drop them. In this case, if we forget to reset 
`FetchedData._nextOffsetInFetchedData`
--- End diff --

Send more messages here so that the executor will prefetch 8 records even 
if the driver tells it to fetch just 4 records. As there is a gap caused by the 
commit marker, `InternalKafkaConsumer` will call `reset` to drop the prefetched 
data. However, before my fix, it will not reset `_nextOffsetInFetchedData`. 
When running the next batch, the offset we try to fetch will match 
`_nextOffsetInFetchedData` and hit this line: 
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L360

As we already dropped the records, `fetchedData.hasNext` will be `false`. 
In addition, `offset` is also < `fetchedData.offsetAfterPoll`, we will just 
skip records between `offset` and `fetchedData.offsetAfterPoll`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...

2018-09-24 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/22429
  
@MaxGekk Make sense. Could you also try to remove the default value?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r219977185
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -196,7 +196,7 @@ case class RDDScanExec(
 }
   }
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Option[Int]): String = {
 s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}"
--- End diff --

`maxFields` should be used here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r219969937
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -250,5 +265,22 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
 def codegenToSeq(): Seq[(String, String)] = {
   org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
 }
+
+/**
+ * Dumps debug information about query execution into the specified 
file.
+ */
+def toFile(path: String): Unit = {
+  val filePath = new Path(path)
+  val fs = FileSystem.get(filePath.toUri, 
sparkSession.sessionState.newHadoopConf())
+  val writer = new OutputStreamWriter(fs.create(filePath))
--- End diff --

`val writer = new BufferedWriter(new 
OutputStreamWriter(fs.create(filePath), UTF_8))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219940593
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -83,6 +83,17 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
 
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS =
+
ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true)
--- End diff --

@srowen Since the user can see these accumulator updates in the public API 
`SparkListenerExecutorMetricsUpdate`, I would prefer to add a flag in case 
someone really needs these zero updates. E.g., a user may use the listener API 
to get all accumulators used in a task. After this change, they cannot get them 
until the task finishes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219576996
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -83,6 +83,17 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
 
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS =
+
ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true)
+
+  private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
+ConfigBuilder("spark.executor.heartbeatInterval")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("10s")
+
+  private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES =
+
ConfigBuilder("spark.executor.heartbeat.maxFailures").intConf.createWithDefault(60)
--- End diff --

nit: call `internal()` to indicate that this is not a public config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219575442
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -799,15 +799,21 @@ private[spark] class Executor(
   if (taskRunner.task != null) {
 taskRunner.task.metrics.mergeShuffleReadMetrics()
 taskRunner.task.metrics.setJvmGCTime(curGCTime - 
taskRunner.startGCTime)
-accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators()))
+val accumulatorsToReport =
+  if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, 
true)) {
+taskRunner.task.metrics.accumulators().filterNot(_.isZero)
+  } else {
+taskRunner.task.metrics.accumulators()
+  }
+accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
   }
 }
 
 val message = Heartbeat(executorId, accumUpdates.toArray, 
env.blockManager.blockManagerId,
   executorUpdates)
 try {
   val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
-  message, RpcTimeout(conf, "spark.executor.heartbeatInterval", 
"10s"))
+  message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key, 
"10s"))
--- End diff --

Could you add a new `apply` method to `object RpcTimeout` to support 
`ConfigEntry`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219574228
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -160,7 +160,7 @@ private[spark] class Executor(
* times, it should kill itself. The default value is 60. It means we 
will retry to send
* heartbeats about 10 minutes because the heartbeat interval is 10s.
*/
-  private val HEARTBEAT_MAX_FAILURES = 
conf.getInt("spark.executor.heartbeat.maxFailures", 60)
+  private val HEARTBEAT_MAX_FAILURES = 
conf.getInt(EXECUTOR_HEARTBEAT_MAX_FAILURES.key, 60)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219577386
  
--- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
---
@@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSug
 }
   }
 
+  test("Heartbeat should drop zero metrics") {
+heartbeatZeroMetricTest(true)
+  }
+
+  test("Heartbeat should not drop zero metrics when the conf is set to 
false") {
+heartbeatZeroMetricTest(false)
+  }
+
+  private def withHeartbeatExecutor(confs: (String, String)*)
+  (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = {
+val conf = new SparkConf
+confs.foreach { case (k, v) => conf.set(k, v) }
+val serializer = new JavaSerializer(conf)
+val env = createMockEnv(conf, serializer)
+val executor =
+  new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, 
isLocal = true)
+val executorClass = classOf[Executor]
+
+// Set ExecutorMetricType.values to be a minimal set to avoid get null 
exceptions
+val metricClass =
+  
Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName()
 + "$")
+val metricTypeValues = metricClass.getDeclaredField("values")
+metricTypeValues.setAccessible(true)
+metricTypeValues.set(
+  org.apache.spark.metrics.ExecutorMetricType,
+  IndexedSeq(JVMHeapMemory, JVMOffHeapMemory))
+
+// Save all heartbeats sent into an ArrayBuffer for verification
+val heartbeats = ArrayBuffer[Heartbeat]()
+val mockReceiver = mock[RpcEndpointRef]
+when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+  .thenAnswer(new Answer[HeartbeatResponse] {
+override def answer(invocation: InvocationOnMock): 
HeartbeatResponse = {
+  val args = invocation.getArguments()
+  val mock = invocation.getMock
+  heartbeats += args(0).asInstanceOf[Heartbeat]
+  HeartbeatResponse(false)
+}
+  })
+val receiverRef = 
executorClass.getDeclaredField("heartbeatReceiverRef")
+receiverRef.setAccessible(true)
+receiverRef.set(executor, mockReceiver)
+
+f(executor, heartbeats)
+  }
+
+  private def invokeReportHeartbeat(executor: Executor): Unit = {
+val method = classOf[Executor]
+  
.getDeclaredMethod("org$apache$spark$executor$Executor$$reportHeartBeat")
+method.setAccessible(true)
+method.invoke(executor)
+  }
+
+  private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = {
+val c = "spark.executor.heartbeat.dropZeroMetrics" -> 
dropZeroMetrics.toString
--- End diff --

nit: EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219575944
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -799,15 +799,21 @@ private[spark] class Executor(
   if (taskRunner.task != null) {
 taskRunner.task.metrics.mergeShuffleReadMetrics()
 taskRunner.task.metrics.setJvmGCTime(curGCTime - 
taskRunner.startGCTime)
-accumUpdates += ((taskRunner.taskId, 
taskRunner.task.metrics.accumulators()))
+val accumulatorsToReport =
+  if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, 
true)) {
--- End diff --

nit: I would prefer to keep this config value close to 
`HEARTBEAT_MAX_FAILURES` to avoid searching it in configs every heartbeat.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...

2018-09-21 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/22473#discussion_r219576946
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -83,6 +83,17 @@ package object config {
   private[spark] val EXECUTOR_CLASS_PATH =
 
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
+  private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS =
+
ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true)
--- End diff --

Also please call `internal()` to indicate that this is not a public config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >