[GitHub] spark issue #22952: [SPARK-20568][SS] Provide option to clean up completed f...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22952 > @zsxwing Btw, how do you think about addressing background move/deletion (I had thought and Yeah, this can be done in a separate ticket. I was playing with `org.apache.hadoop.fs.GlobFilter` to see how to detect the overlap. But one major issue is before getting the target path, we don't know whether a path will match [the glob pattern](https://self-learning-java-tutorial.blogspot.com/2016/01/hadoop-java-globbing.html) or not. The worst case, we can check the overlap when parsing the options for a normal path. For glob path, we can use `GlobFilter/GlobPattern` to check before doing the rename, in which case we can just use the target path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237314690 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) --- End diff -- `val curPath = new Path(new URI(entry.path))` to make it escape/unescape path properly. `entry.path` was created from `Path.toUri.toString`. Could you also add a unit test to test special paths such as `/a/b/a b%.txt`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237319176 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) + } +} + +def remove(entry: FileEntry): Unit = { + val curPath = new Path(entry.path) + try { +logDebug(s"Removing completed file $curPath") +fs.delete(curPath, false) + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +val logOffset = FileStreamSourceOffset(end).logOffset +metadataLog.get(logOffset) match { --- End diff -- you can use `val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)` to use the underlying cache in FileStreamSourceLog. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237200636 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala --- @@ -74,6 +76,39 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * The archive directory to move completed files. The option will be only effective when + * "cleanSource" is set to "archive". + * + * Note that the completed file will be moved to this archive directory with respecting to + * its own path. + * + * For example, if the path of source file is "/a/b/dataset.txt", and the path of archive + * directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt". + */ + val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir") + + /** + * Defines how to clean up completed files. Available options are "archive", "delete", "no_op". + */ + val cleanSource: CleanSourceMode.Value = { +val modeStrOption = parameters.getOrElse("cleanSource", CleanSourceMode.NO_OP.toString) --- End diff -- nit: could you create a method to `CleanSourceMode` to convert a string to `CleanSourceMode.Value`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237315173 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() --- End diff -- Could you do this check only when CleanSourceMode is `ARCHIVE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237315718 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -257,16 +289,65 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +def move(entry: FileEntry, baseArchiveDirPath: String): Unit = { + val curPath = new Path(entry.path) + val curPathUri = curPath.toUri + + val newPath = new Path(baseArchiveDirPath + curPathUri.getPath) + try { +logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") +if (!fs.exists(newPath.getParent)) { + fs.mkdirs(newPath.getParent) +} + +logDebug(s"Archiving completed file $curPath to $newPath") +fs.rename(curPath, newPath) --- End diff -- It's better to also check the return value of `rename`. A user may reuse a source archive dir and cause path conflicts. We should also log this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237320515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -100,6 +101,36 @@ class FileStreamSource( logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") + ensureNoOverlapBetweenSourceAndArchivePath() + + private def ensureNoOverlapBetweenSourceAndArchivePath(): Unit = { +@tailrec +def removeGlob(path: Path): Path = { + if (path.getName.contains("*")) { +removeGlob(path.getParent) + } else { +path + } +} + +sourceOptions.sourceArchiveDir match { + case None => + case Some(archiveDir) => +val sourceUri = removeGlob(qualifiedBasePath).toUri +val archiveUri = new Path(archiveDir).toUri + +val sourcePath = sourceUri.getPath +val archivePath = archiveUri.getPath --- End diff -- we need to use `fs.makeQualified` to turn all user provided paths to absolute paths as the user may just pass a relative path. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22952: [SPARK-20568][SS] Provide option to clean up comp...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22952#discussion_r237314459 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -530,6 +530,12 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "no_op". If the option is not provided, the default value is "no_op". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enbling this option would reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple queries when enabling this option, because source files will be moved or deleted which behavior may impact the other queries. --- End diff -- NOTE 2: The source path should not be used from multiple **sources or** queries when enabling this option, because source files will be moved or deleted which behavior may impact the other **sources and** queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23109: [SPARK-26069][TESTS][FOLLOWUP]Add another possible error...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23109 cc @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23109: [SPARK-26069][TESTS][FOLLOWUP]Add another possibl...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/23109 [SPARK-26069][TESTS][FOLLOWUP]Add another possible error message ## What changes were proposed in this pull request? `org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures` is still flaky and here is error message: ``` sbt.ForkMain$ForkError: java.lang.AssertionError: Got a non-empty set [Failed to send RPC RPC 8249697863992194475 to /172.17.0.2:41177: java.io.IOException: Broken pipe] at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.apache.spark.network.RpcIntegrationSuite.assertErrorAndClosed(RpcIntegrationSuite.java:389) at org.apache.spark.network.RpcIntegrationSuite.sendRpcWithStreamFailures(RpcIntegrationSuite.java:347) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runners.Suite.runChild(Suite.java:128) at org.junit.runners.Suite.runChild(Suite.java:27) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at com.novocode.junit.JUnitRunner$1.execute(JUnitRunner.java:132) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` This happened when the second RPC message was being sent but the connection was closed at the same time. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-26069-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23109 commit 91193e73c570158697150a35a8977c61d5c3f86b Author: Shixiong Zhu Date: 2018-11-21T19:49:45Z another possible error message --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23089: [SPARK-26120][TESTS][SS][SPARKR]Fix a streaming query le...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23089 cc @felixcheung --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23089: [SPARK-26120][TESTS][SS][SPARKR]Fix a streaming q...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/23089 [SPARK-26120][TESTS][SS][SPARKR]Fix a streaming query leak in Structured Streaming R tests ## What changes were proposed in this pull request? Stop the streaming query in `Specify a schema by using a DDL-formatted string when reading` to avoid outputting annoying logs. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-26120 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23089 commit 3353bb1db87cc2f51545b28fcc2f83e6037eb1c3 Author: Shixiong Zhu Date: 2018-11-19T19:50:20Z Fix a streaming query leak in Structured Streaming R tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23060: [SPARK-26092][SS]Use CheckpointFileManager to write the ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23060 Thanks! Merging to master and 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23034: [SPARK-26035][PYTHON] Break large streaming/tests.py fil...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23034 LGTM. By the way, @HyukjinKwon I totally understand that this PR needs to merge soon to avoid getting conflicts. But could you please at least get someone to review and sign off before merging next time? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23060: [SPARK-26092][SS]Use CheckpointFileManager to wri...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/23060 [SPARK-26092][SS]Use CheckpointFileManager to write the streaming metadata file ## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-26092 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23060 commit 4cb6abe5cb44692075841c446aca8d9cf12b8968 Author: Shixiong Zhu Date: 2018-11-16T06:26:43Z Use CheckpointFileManager to write the streaming metadata file --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23041 Thanks. Merging to master and 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23041 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegratio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/23041#discussion_r233734198 --- Diff: common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java --- @@ -371,23 +371,29 @@ private void assertErrorsContain(Set errors, Set contains) { private void assertErrorAndClosed(RpcResult result, String expectedError) { assertTrue("unexpected success: " + result.successMessages, result.successMessages.isEmpty()); -// we expect 1 additional error, which contains *either* "closed" or "Connection reset" +// we expect 1 additional error, which should contain one of the follow messages: +// - "closed" +// - "Connection reset" +// - "java.nio.channels.ClosedChannelException" Set errors = result.errorMessages; assertEquals("Expected 2 errors, got " + errors.size() + "errors: " + errors, 2, errors.size()); Set containsAndClosed = Sets.newHashSet(expectedError); containsAndClosed.add("closed"); containsAndClosed.add("Connection reset"); +containsAndClosed.add("java.nio.channels.ClosedChannelException"); Pair, Set> r = checkErrorsContain(errors, containsAndClosed); -Set errorsNotFound = r.getRight(); -assertEquals(1, errorsNotFound.size()); -String err = errorsNotFound.iterator().next(); -assertTrue(err.equals("closed") || err.equals("Connection reset")); +assertTrue("Got a non-empty set " + r.getLeft(), r.getLeft().isEmpty()); --- End diff -- Moved this check here so that we can see what's the error that causes the test failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite....
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23041 cc @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23041: [SPARK-26069][TESTS]Fix flaky test: RpcIntegratio...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/23041 [SPARK-26069][TESTS]Fix flaky test: RpcIntegrationSuite.sendRpcWithStreamFailures ## What changes were proposed in this pull request? The test failure is because `assertErrorAndClosed` misses one possible error message: `java.nio.channels.ClosedChannelException`. This happens when the second `uploadStream` is called after the channel has been closed. This can be reproduced by adding `Thread.sleep(1000)` below this line: https://github.com/apache/spark/blob/03306a6df39c9fd6cb581401c13c4dfc6bbd632e/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java#L217 This PR fixes the above issue and also improves the test failure messages of `assertErrorAndClosed`. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-26069 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23041.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23041 commit 6bebcb5e004ed4b434c550d26ed1a922d13e0446 Author: Shixiong Zhu Date: 2018-11-15T07:16:00Z fix test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23023 Thanks! Merging to master and 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaCon...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/23023 cc @jose-torres --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23023: [SPARK-26042][SS][TESTS]Fix a potential hang in K...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/23023 [SPARK-26042][SS][TESTS]Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite ## What changes were proposed in this pull request? As initializing lazy vals shares the same lock, a thread is trying to initialize `executedPlan` when `isRDD` is running, this thread will hang forever. This PR just materializes `executedPlan` so that accessing it when `toRdd` is running doesn't need to wait for a lock ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-26042 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23023.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23023 commit 33b51136e6292db8236c6d39d662887d18a9534d Author: Shixiong Zhu Date: 2018-11-13T17:42:08Z Fix a potential hang in KafkaContinuousSourceTopicDeletionSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22952: [SPARK-20568][SS] Rename files which are completed in pr...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22952 > Provide additional option: delete (two options - 'rename' / 'delete' - are mutually exclusive) > > Actually the actions end users are expected to take are 1. moving to archive directory (with compression or not) 2. delete periodically. If moving/renaming require non-trivial cost, end users may want to just delete files directly without backing up. +1 for this approach. The file listing cost is huge when the directory has a lot of files. I think one of the goals of this feature is reducing the file listing cost. Hence either delete the files or move to a different directory should be fine. Also could you try to make one simple option for `rename/delete`, such as `cleanSource` -> (`none`, `rename` or `delete`)? When the user picks up `rename`, they should be able to set the archive directory using another option. In addition, it would be great that we can document that whenever using this option, the same directory should not be used by multiple queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22923: [SPARK-25910][CORE] accumulator updates from previous st...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22923 We need to always update user accumulators. Right now such task metrics just cause some annoying error logs, seems not worth to fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22910: [SPARK-25899][TESTS]Fix flaky CoarseGrainedSchedu...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22910 [SPARK-25899][TESTS]Fix flaky CoarseGrainedSchedulerBackendSuite ## What changes were proposed in this pull request? I saw CoarseGrainedSchedulerBackendSuite failed in my PR and finally reproduced the following error on a very busy machine: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 400 times over 10.00982864399 seconds. Last failure message: ArrayBuffer("2", "0", "3") had length 3 instead of expected length 4. ``` The logs in this test shows executor 1 was not up when the test failed. ``` 18/10/30 11:34:03.563 dispatcher-event-loop-12 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43656) with ID 2 18/10/30 11:34:03.593 dispatcher-event-loop-3 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43658) with ID 3 18/10/30 11:34:03.629 dispatcher-event-loop-6 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.17.0.2:43654) with ID 0 18/10/30 11:34:03.885 pool-1-thread-1-ScalaTest-running-CoarseGrainedSchedulerBackendSuite INFO CoarseGrainedSchedulerBackendSuite: = FINISHED o.a.s.scheduler.CoarseGrainedSchedulerBackendSuite: 'compute max number of concurrent tasks can be launched' = ``` And the following logs in executor 1 shows it was still doing the initialization when the timeout happened (at 18/10/30 11:34:03.885). ``` 18/10/30 11:34:03.463 netty-rpc-connection-0 INFO TransportClientFactory: Successfully created connection to 54b6b6217301/172.17.0.2:33741 after 37 ms (0 ms spent in bootstraps) 18/10/30 11:34:03.959 main INFO DiskBlockManager: Created local directory at /home/jenkins/workspace/core/target/tmp/spark-383518bc-53bd-4d9c-885b-d881f03875bf/executor-61c406e4-178f-40a6-ac2c-7314ee6fb142/blockmgr-03fb84a1-eedc-4055-8743-682eb3ac5c67 18/10/30 11:34:03.993 main INFO MemoryStore: MemoryStore started with capacity 546.3 MB ``` Hence, I think our current 10 seconds is not enough on a slow Jenkins machine. This PR just increases the timeout from 10 seconds to 60 seconds to make the test more stable. ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-flaky-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22910.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22910 commit 0a3095fd5610810004ef6a0d1e02581b78bfdea4 Author: Shixiong Zhu Date: 2018-10-30T22:08:26Z Fix flaky CoarseGrainedSchedulerBackendSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 @markhamstra any further comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228409787 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- I created https://issues.apache.org/jira/browse/SPARK-25849 to improve the document. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r228355772 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) +try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Killing potential speculative or " + +s"zombie tasks for this job") --- End diff -- How about `... is finished. Cancelling ...`? This should be consistent with other places. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22816 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22816: [SPARK-25822][PySpark]Fix a race condition when r...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22816#discussion_r228262084 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -114,7 +114,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( context.addTaskCompletionListener[Unit] { _ => writerThread.shutdownOnTaskCompletion() - if (!reuseWorker || !released.get) { + if (!reuseWorker || released.compareAndSet(false, true)) { --- End diff -- Addressed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22816: [SPARK-25822][PySpark]Fix a race condition when releasin...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22816 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22816: [SPARK-25822][PySpark]Fix a race condition when r...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22816 [SPARK-25822][PySpark]Fix a race condition when releasing a Python worker ## What changes were proposed in this pull request? There is a race condition when releasing a Python worker. If `ReaderIterator.handleEndOfDataSection` is not running in the task thread, when a task is early terminated (such as `take(N)`), the task completion listener may close the worker but "handleEndOfDataSection" can still put the worker into the worker pool to reuse. https://github.com/zsxwing/spark/commit/0e07b483d2e7c68f3b5c3c118d0bf58c501041b7 is a patch to reproduce this issue. I also found a user reported this in the mail list: http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCAAUq=h+yluepd23nwvq13ms5hostkhx3ao4f4zqv6sgo5zm...@mail.gmail.com%3E This PR fixes the issue by using `compareAndSet` to make sure we will never return a closed worker to the work pool. ## How was this patch tested? Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-socket-closed Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22816.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22816 commit a22e38917b4f2893ce0d72febed4df1d3eb9fdd5 Author: Shixiong Zhu Date: 2018-10-24T07:51:26Z Fix a race condition when releasing a Python worker --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 I agreed that the task reaper is a big change to the story and we should reconsider SPARK-17064. Could we move the discussion to SPARK-17064? By the way, regarding this PR itself, @tgravescs @markhamstra any further comments? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 @tgravescs Yeah, looks like that https://issues.apache.org/jira/browse/SPARK-24622 is better but it may take more time than this one, since this PR is smaller and less risky. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22771#discussion_r227060028 --- Diff: core/src/test/scala/org/apache/spark/SparkContextSuite.scala --- @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + + test("cancel zombie tasks in a result stage when the job finishes") { +val conf = new SparkConf() + .setMaster("local-cluster[1,2,1024]") + .setAppName("test-cluster") + .set("spark.ui.enabled", "false") + // Disable this so that if a task is running, we can make sure the executor will always send + // task metrics via heartbeat to driver. + .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") + // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast + .set("spark.executor.heartbeatInterval", "1s") +sc = new SparkContext(conf) +sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") +@volatile var runningTaskIds: Seq[Long] = null +val listener = new SparkListener { + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { +if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) { + runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1) +} + } +} +sc.addSparkListener(listener) +sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) => + val context = org.apache.spark.TaskContext.get() + if (context.stageAttemptNumber == 0) { +if (context.partitionId == 0) { + // Make the first task in the first stage attempt fail. + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0, +new java.io.IOException("fake")) +} else { + // Make the second task in the first stage attempt sleep to generate a zombie task + Thread.sleep(6) +} + } else { +// Make the second stage attempt successful. + } + x +}.collect() +sc.listenerBus.waitUntilEmpty(1) +// As executors will send the metrics of running tasks via heartbeat, we can use this to check +// whether there is any running task. --- End diff -- I prefer this way to make sure the executor did receive the kill command and interrupt the tasks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r227056168 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -31,15 +32,15 @@ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { self => import PythonWorkerFactory._ // Because forking processes from Java is expensive, we prefer to launch a single Python daemon, // pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon // currently only works on UNIX-based systems now because it uses signals for child management, // so we can also fall back to launching workers, pyspark/worker.py (by default) directly. - val useDaemon = { + private val useDaemon = { --- End diff -- Fixing these since I'm touching a lot of fields in this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22770: [SPARK-25771][PYSPARK]Fix improper synchronization in Py...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22770 Thanks for reviewing this. Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22771: [SPARK-25773][Core]Cancel zombie tasks in a result stage...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22771 cc @squito @tgravescs @jiangxb1987 @kayousterhout --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22771: [SPARK-25773][Core]Cancel zombie tasks in a resul...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22771 [SPARK-25773][Core]Cancel zombie tasks in a result stage when the job finishes ## What changes were proposed in this pull request? When a job finishes, there may be some zombie tasks still running due to stage retry. Since a result stage will never be used by other jobs, running these tasks are just wasting the cluster resource. This PR just asks TaskScheduler to cancel the running tasks of a result stage when it's already finished. Credits go to @srinathshankar who suggested this idea to me. This PR also fixes two minor issues while I'm touching DAGScheduler: - Invalid spark.job.interruptOnCancel should not crash DAGScheduler. - Non fatal errors should not crash DAGScheduler. ## How was this patch tested? The new unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25773 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22771 commit 581ea53b57cc9fc0e89f2d635422653cfdfcb27f Author: Shixiong Zhu Date: 2018-10-16T22:07:04Z Cancel zombie tasks in a result stage when the job finishes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226459546 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -278,7 +289,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String override def run() { while (true) { -synchronized { +self.synchronized { --- End diff -- this is fix 1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226459609 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -163,7 +172,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { val socket = serverSocket.accept() authHelper.authClient(socket) -simpleWorkers.put(socket, worker) +self.synchronized { --- End diff -- this is fix 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22770 [SPARK-25771][PYSPARK]Fix improper synchronization in PythonWorkerFactory ## What changes were proposed in this pull request? Fix the following issues in PythonWorkerFactory - MonitorThread.run uses a wrong lock. - `createSimpleWorker` misses `synchronized` when updating `simpleWorkers`. Other changes are just to improve the code style to make the thread-safe contract clear. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark pwf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22770 commit 0369de2323640377c0f990bb47ebe112654ca498 Author: Shixiong Zhu Date: 2018-10-18T20:39:04Z fix improper synchronization --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22692: [SPARK-25598][STREAMING][BUILD][test-maven] Remove flume...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22692 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22173: [SPARK-24355] Spark external shuffle server impro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22173#discussion_r223513440 --- Diff: common/network-common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -77,17 +82,54 @@ private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE; private static final MessageDecoder DECODER = MessageDecoder.INSTANCE; + // Separate thread pool for handling ChunkFetchRequest. This helps to enable throttling + // max number of TransportServer worker threads that are blocked on writing response + // of ChunkFetchRequest message back to the client via the underlying channel. + private static EventLoopGroup chunkFetchWorkers; --- End diff -- Is there any special reason that this must be a global one? I have not yet looked the details. But looks like this may cause ChunkFetchIntegrationSuite flaky as there is no isolation between tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22628: [SPARK-25641] Change the spark.shuffle.server.chunkFetch...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22628 Is it supposed to the flaky ChunkFetchIntegrationSuite? http://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.network.ChunkFetchIntegrationSuite_name=fetchFileChunk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch, python...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22627 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223490770 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream().foreachBatch( + new VoidFunction2, Long> { +public void call(Dataset dataset, Long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epoch_id): +# Transform and write batchDF +pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With `foreachBatch`, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using `foreachBatch`, you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.persist() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.unpersist() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using `foreachBatch`, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223490811 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream().foreachBatch( + new VoidFunction2, Long> { +public void call(Dataset dataset, Long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epoch_id): +# Transform and write batchDF +pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With `foreachBatch`, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using `foreachBatch`, you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.persist() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.unpersist() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using `foreachBatch`, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223491087 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream().foreachBatch( + new VoidFunction2, Long> { +public void call(Dataset dataset, Long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epoch_id): --- End diff -- nit: `foreachBatchFunction` -> `foreach_batch_function` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223491101 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,214 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream().foreachBatch( + new VoidFunction2, Long> { +public void call(Dataset dataset, Long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epoch_id): +# Transform and write batchDF +pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() --- End diff -- nit: `foreachBatchFunction` -> `foreach_batch_function` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r223481016 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22649 > and this only becomes ambiguous in 2.12 (long story). Yeah, I'm curious why it didn't fail in 2.12 before. I know there are several Scala features not working with overload. This may be one of them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22649: [SPARK-25644][SS][FOLLOWUP][BUILD] Fix Scala 2.12 build ...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22649 Thanks for fixing this. I'm just curious why it didn't fail before my change from `Long` to `java.lang.Long`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22647: [SPARK-25655] [BUILD] Add Pspark-ganglia-lgpl to the sca...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22647 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22633 Thanks. Merging to master and 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22633 Looks like `lint-java` doesn't catch any style issues in my PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222813906 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() --- End diff -- `uncache()` -> `unpersist()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815111 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815845 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222812874 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815770 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222812936 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( --- End diff -- nit: `writeStream()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815072 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222818441 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat + +## Recovery Semantics after Changes in a Streaming Query +There are limitations on what changes in a streaming query are allowed between restarts from the +same checkpoint location. Here are a few kinds of changes that are either not allowed, or +the effect of the change is not well-defined. For all of them: + +- The term *allowed* means you can do the specified change but whether the semantics of its effect + is well-defined depends on the query and the change. + +- The term *not allowed* means you should not do the specified change as the restarted query is likely + to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset + generated with sparkSession.readStream. + +**Types of changes** + +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed. + +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics + of the change are well-defined depends on the source and the query. Here are a few examples. + + - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)` + + - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")` + +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks + are allowed. This needs to be verified on a case-by-case basis. Here are a few examples. + + - File sink to Kafka sink is allowed. Kafka will see only the new data. + + - Kafka sink to file sink is not allowed. + + - Kafka sink changed to foreach, or vice versa is allowed. + +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of + the change are well-defined depends on the sink and the query. Here are a few examples. + + - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")` + + - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")` + + - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code. + +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: + + - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. + + - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`. --- End diff -- this example changes the schema. Right? From `string` to `struct`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222817163 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222814573 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222812757 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { --- End diff -- long -> Long. I noticed the current Java API actually is wrong. Submitted https://github.com/apache/spark/pull/22633 to fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222818615 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat + +## Recovery Semantics after Changes in a Streaming Query +There are limitations on what changes in a streaming query are allowed between restarts from the +same checkpoint location. Here are a few kinds of changes that are either not allowed, or +the effect of the change is not well-defined. For all of them: + +- The term *allowed* means you can do the specified change but whether the semantics of its effect + is well-defined depends on the query and the change. + +- The term *not allowed* means you should not do the specified change as the restarted query is likely + to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset + generated with sparkSession.readStream. + +**Types of changes** + +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed. + +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics + of the change are well-defined depends on the source and the query. Here are a few examples. + + - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)` + + - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")` + +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks + are allowed. This needs to be verified on a case-by-case basis. Here are a few examples. + + - File sink to Kafka sink is allowed. Kafka will see only the new data. + + - Kafka sink to file sink is not allowed. + + - Kafka sink changed to foreach, or vice versa is allowed. + +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of + the change are well-defined depends on the sink and the query. Here are a few examples. + + - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")` + + - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")` + + - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) is allowed, but the semantics of the change depends on the code. + +- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: + + - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. + + - Changes in projections with same output schema is allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.select(to_json(...).as("json")).writeStream`. + + - Changes in projections with different output schema are conditionally allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` is allowed only if the output sink allows the schema change from `"a"` to `"b"`. + +- *Changes in stateful operations*: Some operations in streaming queries need to maintain + state data in order to continuously update the result. Structured Streaming automatically checkpoints + the state data to fault-tolerant storage (for example, DBFS, AWS S3, Azure Blob storage) and restores it after restart. --- End diff -- remove `DBFS`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222818119 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2709,6 +2935,78 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat + +## Recovery Semantics after Changes in a Streaming Query +There are limitations on what changes in a streaming query are allowed between restarts from the +same checkpoint location. Here are a few kinds of changes that are either not allowed, or +the effect of the change is not well-defined. For all of them: + +- The term *allowed* means you can do the specified change but whether the semantics of its effect + is well-defined depends on the query and the change. + +- The term *not allowed* means you should not do the specified change as the restarted query is likely + to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset + generated with sparkSession.readStream. + +**Types of changes** + +- *Changes in the number or type (i.e. different source) of input sources*: This is not allowed. + +- *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics + of the change are well-defined depends on the source and the query. Here are a few examples. + + - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)` + + - Changes to subscribed topics/files is generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")` + +- *Changes in the type of output sink*: Changes between a few specific combinations of sinks + are allowed. This needs to be verified on a case-by-case basis. Here are a few examples. + + - File sink to Kafka sink is allowed. Kafka will see only the new data. + + - Kafka sink to file sink is not allowed. + + - Kafka sink changed to foreach, or vice versa is allowed. + +- *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of + the change are well-defined depends on the sink and the query. Here are a few examples. + + - Changes to output directory of a file sink is not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")` + + - Changes to output topic is allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("path", "anotherTopic")` --- End diff -- nit: `path` -> `topic` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815808 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815830 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22627: [SPARK-25639] [DOCS] Added docs for foreachBatch,...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22627#discussion_r222815089 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates")) -# Using Foreach -The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter` -([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs), -which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points. +# Using Foreach and ForeachBatch +The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing +logic on the output of a streaming query. They have slightly different use cases - while `foreach` +allows custom write logic on every row, `foreachBatch` allows arbitrary operations +and custom logic on the output of each micro-batch. Let's understand their usages in more detail. + +## ForeachBatch +`foreachBatch(...)` allows you to specify a function that is executed on +the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. +It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch. + + + + +{% highlight scala %} +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + // Transform and write batchDF +}.start() +{% endhighlight %} + + + + +{% highlight java %} +streamingDatasetOfString.writeStream.foreachBatch( + new VoidFunction2, long> { +void call(Dataset dataset, long batchId) { + // Transform and write batchDF +} + } +).start(); +{% endhighlight %} + + + + +{% highlight python %} +def foreachBatchFunction(df, epochId): + # Transform and write batchDF + pass + +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start() +{% endhighlight %} + + + +R is not yet supported. + + + +With foreachBatch, you can do the following. + +- **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, + but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch + data writers on the output of each micro-batch. +- **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, + then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can + cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, + you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline. + +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => + batchDF.cache() + batchDF.write.format(...).save(...) // location 1 + batchDF.write.format(...).save(...) // location 2 + batchDF.uncache() +} + +- **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported + in streaming DataFrames because Spark does not support generating incremental plans in those cases. + Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself. + +**Note:** +- By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the + batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee. +- `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the + micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead. + + +## Foreach +If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or +continuous processing mode), then you can express you custom writer logic using `foreach`. +Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`. +Since Spark 2.4, `foreach` is available in Scala, Java and Python. + + + + +In Scala, you have to extend the class `For
[GitHub] spark pull request #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStr...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22633 [SPARK-25644][SS]Fix java foreachBatch in DataStreamWriter ## What changes were proposed in this pull request? The java `foreachBatch` API in `DataStreamWriter` should accept `java.lang.Long` rather `scala.Long`. ## How was this patch tested? New java test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark fix-java-foreachbatch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22633.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22633 commit aed8c2457f8abff5b50d579dc3a4938cc36c5be1 Author: Shixiong Zhu Date: 2018-10-04T20:12:25Z fix java foreachBatch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22633: [SPARK-25644][SS]Fix java foreachBatch in DataStreamWrit...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22633 cc @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22586: [SPARK-25568][Core]Continue to update the remaini...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22586#discussion_r221438766 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -1880,6 +1880,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(sc.parallelize(1 to 10, 2).count() === 10) } + test("misbehaved accumulator should not impact other accumulators") { --- End diff -- > Also verify the log message? That's not in the core project. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22473 Thanks! Merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22586: [SPARK-25568][Core]Continue to update the remaini...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22586 [SPARK-25568][Core]Continue to update the remaining accumulators when failing to update one accumulator ## What changes were proposed in this pull request? Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining accumulators so that they can still report correct values. ## How was this patch tested? The new unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-25568 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22586 commit badb4711caff6e3d10ba9037f71c1ad6515577e8 Author: Shixiong Zhu Date: 2018-09-28T20:52:07Z continue to update the remaining accumulators --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22473 LGTM pending tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22473: [SPARK-25449][CORE] Heartbeat shouldn't include accumula...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22473 Looks like `org.apache.spark.deploy.history.HistoryServerSuite.executor list with executor metrics json` and `org.apache.spark.util.JsonProtocolSuite.SparkListenerEvent` are broken by the changes. Could you also fix them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r221313411 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") -val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") -val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") +val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") --- End diff -- Could you use `getTimeAsSeconds` and manually convert it to `ms`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r221022783 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") -val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") -val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") +val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") +val executorHeartbeatInterval = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") + s"spark.network.timeout=${executorTimeoutThreshold}ms must be no less than the value of " + --- End diff -- nit: "ms" -> "s" once you address the above comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r221022651 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -609,13 +609,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") -val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") -val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") +val executorTimeoutThreshold = getTimeAsMs("spark.network.timeout", "120s") --- End diff -- Could you change `getTimeAsMs` back to `getTimeAsSeconds`? There is a slight difference when the user doesn't specify the time unit. `getTimeAsMs` uses `ms` as default, while `getTimeAsSeconds` uses `seconds`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22553: [SPARK-25541][SQL] CaseInsensitiveMap should be s...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22553#discussion_r220718383 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala --- @@ -42,7 +42,11 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator override def -(key: String): Map[String, T] = { -new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key))) +new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key))) + } + + override def filterKeys(p: (String) => Boolean): Map[String, T] = { --- End diff -- This method right now doesn't follow the Scaladoc: ``` * @return an immutable map consisting only of those key value pairs of this map where the key satisfies * the predicate `p`. The resulting map wraps the original map without copying any elements. ``` Maybe we should not add it. This is a Scala issue but they didn't fix in until 2.13 (https://github.com/scala/bug/issues/4776). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22507: [SPARK-25495][SS]FetchedData.reset should reset all fiel...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22507 Thanks! Merging to master and 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22507: [SPARK-25495][SS]FetchedData.reset should reset a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22507#discussion_r220286600 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala --- @@ -874,6 +874,57 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } } + + test("SPARK-25495: FetchedData.reset should reset all fields") { +val topic = newTopic() +val topicPartition = new TopicPartition(topic, 0) +testUtils.createTopic(topic, partitions = 1) + +val ds = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.isolation.level", "read_committed") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .load() + .select($"value".as[String]) + +testUtils.withTranscationalProducer { producer => + producer.beginTransaction() + (0 to 3).foreach { i => +producer.send(new ProducerRecord[String, String](topic, i.toString)).get() + } + producer.commitTransaction() +} +testUtils.waitUntilOffsetAppears(topicPartition, 5) + +val q = ds.writeStream.foreachBatch { (ds, epochId) => + if (epochId == 0) { +// Post more messages to Kafka so that the executors will fetch messages in the next batch +// and drop them. In this case, if we forget to reset `FetchedData._nextOffsetInFetchedData` --- End diff -- Send more messages here so that the executor will prefetch 8 records even if the driver tells it to fetch just 4 records. As there is a gap caused by the commit marker, `InternalKafkaConsumer` will call `reset` to drop the prefetched data. However, before my fix, it will not reset `_nextOffsetInFetchedData`. When running the next batch, the offset we try to fetch will match `_nextOffsetInFetchedData` and hit this line: https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L360 As we already dropped the records, `fetchedData.hasNext` will be `false`. In addition, `offset` is also < `fetchedData.offsetAfterPoll`, we will just skip records between `offset` and `fetchedData.offsetAfterPoll`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/22429 @MaxGekk Make sense. Could you also try to remove the default value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r219977185 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -196,7 +196,7 @@ case class RDDScanExec( } } - override def simpleString: String = { + override def simpleString(maxFields: Option[Int]): String = { s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" --- End diff -- `maxFields` should be used here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22429#discussion_r219969937 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala --- @@ -250,5 +265,22 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { def codegenToSeq(): Seq[(String, String)] = { org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan) } + +/** + * Dumps debug information about query execution into the specified file. + */ +def toFile(path: String): Unit = { + val filePath = new Path(path) + val fs = FileSystem.get(filePath.toUri, sparkSession.sessionState.newHadoopConf()) + val writer = new OutputStreamWriter(fs.create(filePath)) --- End diff -- `val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath), UTF_8))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219940593 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -83,6 +83,17 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = + ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) --- End diff -- @srowen Since the user can see these accumulator updates in the public API `SparkListenerExecutorMetricsUpdate`, I would prefer to add a flag in case someone really needs these zero updates. E.g., a user may use the listener API to get all accumulators used in a task. After this change, they cannot get them until the task finishes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219576996 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -83,6 +83,17 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = + ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) + + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = +ConfigBuilder("spark.executor.heartbeatInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = + ConfigBuilder("spark.executor.heartbeat.maxFailures").intConf.createWithDefault(60) --- End diff -- nit: call `internal()` to indicate that this is not a public config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219575442 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -799,15 +799,21 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) -accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) +val accumulatorsToReport = + if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) { +taskRunner.task.metrics.accumulators().filterNot(_.isZero) + } else { +taskRunner.task.metrics.accumulators() + } +accumUpdates += ((taskRunner.taskId, accumulatorsToReport)) } } val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId, executorUpdates) try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, RpcTimeout(conf, EXECUTOR_HEARTBEAT_INTERVAL.key, "10s")) --- End diff -- Could you add a new `apply` method to `object RpcTimeout` to support `ConfigEntry`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219574228 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -160,7 +160,7 @@ private[spark] class Executor( * times, it should kill itself. The default value is 60. It means we will retry to send * heartbeats about 10 minutes because the heartbeat interval is 10s. */ - private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) + private val HEARTBEAT_MAX_FAILURES = conf.getInt(EXECUTOR_HEARTBEAT_MAX_FAILURES.key, 60) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219577386 --- Diff: core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala --- @@ -252,18 +253,121 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero metrics") { +heartbeatZeroMetricTest(true) + } + + test("Heartbeat should not drop zero metrics when the conf is set to false") { +heartbeatZeroMetricTest(false) + } + + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { +val conf = new SparkConf +confs.foreach { case (k, v) => conf.set(k, v) } +val serializer = new JavaSerializer(conf) +val env = createMockEnv(conf, serializer) +val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) +val executorClass = classOf[Executor] + +// Set ExecutorMetricType.values to be a minimal set to avoid get null exceptions +val metricClass = + Utils.classForName(classOf[org.apache.spark.metrics.ExecutorMetricType].getName() + "$") +val metricTypeValues = metricClass.getDeclaredField("values") +metricTypeValues.setAccessible(true) +metricTypeValues.set( + org.apache.spark.metrics.ExecutorMetricType, + IndexedSeq(JVMHeapMemory, JVMOffHeapMemory)) + +// Save all heartbeats sent into an ArrayBuffer for verification +val heartbeats = ArrayBuffer[Heartbeat]() +val mockReceiver = mock[RpcEndpointRef] +when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { +override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) +} + }) +val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") +receiverRef.setAccessible(true) +receiverRef.set(executor, mockReceiver) + +f(executor, heartbeats) + } + + private def invokeReportHeartbeat(executor: Executor): Unit = { +val method = classOf[Executor] + .getDeclaredMethod("org$apache$spark$executor$Executor$$reportHeartBeat") +method.setAccessible(true) +method.invoke(executor) + } + + private def heartbeatZeroMetricTest(dropZeroMetrics: Boolean): Unit = { +val c = "spark.executor.heartbeat.dropZeroMetrics" -> dropZeroMetrics.toString --- End diff -- nit: EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219575944 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -799,15 +799,21 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) -accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) +val accumulatorsToReport = + if (conf.getBoolean(EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS.key, true)) { --- End diff -- nit: I would prefer to keep this config value close to `HEARTBEAT_MAX_FAILURES` to avoid searching it in configs every heartbeat. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22473: [SPARK-25449][CORE] Heartbeat shouldn't include a...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22473#discussion_r219576946 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -83,6 +83,17 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_METRICS = + ConfigBuilder("spark.executor.heartbeat.dropZeroMetrics").booleanConf.createWithDefault(true) --- End diff -- Also please call `internal()` to indicate that this is not a public config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org