[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/9812 [SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from ⦠â¦parent class loader Without patch, two additional tests of ExecutorClassLoaderSuite fails. - "resource from parent" - "resources from parent" Detailed explanation is here, https://issues.apache.org/jira/browse/SPARK-11818?focusedCommentId=15011202&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15011202 You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-11818 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/9812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #9812 commit 698159a0a1fcf8c0ba042daf875d037df3f8ed6f Author: Jungtaek Lim Date: 2015-11-18T15:21:18Z [SPARK-11818][REPL] Fix ExecutorClassLoader to lookup resources from parent class loader * Without patch, some tests of ExecutorClassLoaderSuite fails --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/9812#discussion_r45260904 --- Diff: core/src/main/scala/org/apache/spark/TestUtils.scala --- @@ -159,6 +159,16 @@ private[spark] object TestUtils { createCompiledClass(className, destDir, sourceFile, classpathUrls) } + def createResource( --- End diff -- @srowen Thanks, I'll inlining it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/9812#discussion_r45261349 --- Diff: repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala --- @@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader } } + override def getResource(name: String): URL = { +parentLoader.getResource(name) --- End diff -- @srowen It doesn't need to check ```userClassPathFirst``` since this implementation implies that REPL never provides resources dynamically so there's no need to lookup resource from ExecutorClassLoader itself. Btw, could precondition be broken? I couldn't imagine REPL generating resources. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/9812#discussion_r45285698 --- Diff: repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala --- @@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader } } + override def getResource(name: String): URL = { +parentLoader.getResource(name) --- End diff -- @vanzin I didn't see use case you mentioned but it could make sense. In order to achieve, we have to implement findResource() and findResources() for ExecutorClassLoader since ExecutorClassLoader cannot rely on superclass (ClassLoader) to load class / resource. It is easy to provide resource URL which points to origin scheme (http, https, ftp, hdfs), but since I'm new to classloader, so I'm wondering it is safe to return URL from findResource() and findResources() which doesn't point to local file. If it is not safe to return non local file as URL, what's recommended way to do? I can only think about downloading files to local temp directory per every call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/9812#discussion_r45288354 --- Diff: repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala --- @@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader } } + override def getResource(name: String): URL = { +parentLoader.getResource(name) --- End diff -- @vanzin To clarify about "feature", do you want me to change implementation of findResource() and findResources() for pointing origin scheme, and forget about potential odd? Or forget about finding resources from REPL uri and leave as this PR is? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/9812#discussion_r45288990 --- Diff: repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala --- @@ -55,6 +57,14 @@ class ExecutorClassLoader(conf: SparkConf, classUri: String, parent: ClassLoader } } + override def getResource(name: String): URL = { +parentLoader.getResource(name) --- End diff -- @vanzin OK, Thanks for clarification! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on the pull request: https://github.com/apache/spark/pull/9812#issuecomment-158338205 @vanzin Thanks for reviewing, I addressed your comment. Please take a look again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11818][REPL] Fix ExecutorClassLoader to...
Github user HeartSaVioR commented on the pull request: https://github.com/apache/spark/pull/9812#issuecomment-158377559 Failed tests seems not related. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21136 LGTM. This is what I also found so far today. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21152 [SPARK-23688][SS] Refactor tests away from rate source ## What changes were proposed in this pull request? Replace rate source with memory source in continuous mode. Keep using "rate" source if the tests intend to put data periodically in background, or need to write short source name, since "memory" doesn't have provider for source. ## How was this patch tested? Ran relevant test suite from IDE. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-23688 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21152 commit 5aac856b3ef0118d174f016fc6a476f0facf174b Author: Jungtaek Lim Date: 2018-04-25T09:46:30Z [SPARK-23688][SS] Refactor tests away from rate source * replace rate source with memory source in continous mode --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21152 @jose-torres Please review this PR. Thanks! cc. @jerryshao @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184006570 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().where('value > 5) --- End diff -- I intended to use untyped filter because of SPARK-24061. Once #21136 is merged we could change this, but not sure we want to have both untyped and typed for every tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184213878 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- @jose-torres Yeah my intention is ensuring Spark operations work same as Scala collection methods, but sure enumerating is also OK since we all know about the result easily. Are you in favor of enumerating literals we already know instead of calculating for all the tests? Or just only this line? Just would like to apply the approach consistently. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21152 also cc. to @tdas since he reviews SS related PRs (as well as continuous mode) so far. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184236837 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().where('value > 5) --- End diff -- @jose-torres What do you think about this? Would it be better to have tests for untyped and typed? Code duplication is not that huge since I guess logic for verification can be reused for every test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21063: [SPARK-23886][Structured Streaming] Update query ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21063#discussion_r184241625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -111,7 +112,12 @@ trait ProgressReporter extends Logging { logDebug("Starting Trigger Calculation") lastTriggerStartTimestamp = currentTriggerStartTimestamp currentTriggerStartTimestamp = triggerClock.getTimeMillis() -currentStatus = currentStatus.copy(isTriggerActive = true) +// isTriggerActive field is kept false for ContinuousExecution +// since it is tied to MicroBatchExecution +this match { --- End diff -- nit: someone may have a concern that a trait needs to be aware of actual implementation. There looks like two options: 1. extract method to only update currentStatus for starting trigger defaulting to `isTriggerActive = true`, and let `ContinuousExecution` overrides the method. 2. just override `startTrigger()` in `ContinuousExecution`, and call `super.startTrigger()` and update currentStatus once again. It might open very small window for other threads to read invalid status information (isTriggerActive = true), but will require less change if it is acceptable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184321762 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) --- End diff -- Thanks for the pointer. I started from `(0 to 5)` but spark Scala style guide mentions avoiding infix notation so a bit puzzled (I was not sure `to` is an operator). Will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184322699 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), --- End diff -- Got it. Looks like we could reduce the range and list out literals. Will update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21152 Thanks fore reviewing. I have addressed review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21152: [SPARK-23688][SS] Refactor tests away from rate source
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21152 @jerryshao Thanks for merging! My Apache JIRA ID is âkabhwanâ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185198458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.io.Closeable +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset +import org.apache.spark.util.ThreadUtils + +/** + * A wrapper for a continuous processing data reader, including a reading queue and epoch markers. + * + * This will be instantiated once per partition - successive calls to compute() in the + * [[ContinuousDataSourceRDD]] will reuse the same reader. This is required to get continuity of + * offsets across epochs. + * + * For performance reasons, this is very weakly encapsulated. There are six handles for the RDD: + * * currentOffset - contains the offset of the most recent row which a compute() iterator has sent + *upwards. The RDD is responsible for advancing this. + * * currentEpoch - the epoch which is currently occurring. The RDD is responsible for incrementing + *this before ending the compute() iterator. + * * queue - the queue of incoming rows (row, offset) or epoch markers (null, null). The + *ContinuousQueuedDataReader writes into this queue, and RDD.compute() will read from it. + * * {epochPoll|dataReader}Failed - flags to check if the epoch poll and data reader threads are + *still running. These threads won't be restarted if they fail, so the RDD should intercept + *this state when convenient to fail the query. + * * close() - to close this reader when the query is going to shut down. + */ +class ContinuousQueuedDataReader( +split: Partition, +context: TaskContext, +dataQueueSize: Int, +epochPollIntervalMs: Long) extends Closeable { + private val reader = split.asInstanceOf[DataSourceRDDPartition[UnsafeRow]] +.readerFactory.createDataReader() + + // Important sequencing - we must get our starting point before the provider threads start running + var currentOffset: PartitionOffset = ContinuousDataSourceRDD.getBaseReader(reader).getOffset + var currentEpoch: Long = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + // This queue contains two types of messages: + // * (null, null) representing an epoch boundary. + // * (row, off) containing a data row and its corresponding PartitionOffset. + val queue = new ArrayBlockingQueue[(UnsafeRow, PartitionOffset)](dataQueueSize) + + val epochPollFailed = new AtomicBoolean(false) + val dataReaderFailed = new AtomicBoolean(false) + + private val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) + + private val epochPollExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +s"epoch-poll--$coordinatorId--${context.partitionId()}") + val epochPollRunnable = new EpochPollRunnable(queue, context, epochPollFailed) + epochPollExecutor.scheduleWithFixedDelay( +epochPollRunnable, 0, epochPollIntervalMs, TimeUnit.MILLISECONDS) + + val dataReaderThread = new DataReaderThread(reader, queue, context, dataReaderFailed) + dataReaderThread.setDaemon(true) + dataReaderThread.start() + + context.addTaskCompletionListener(_ => { --- End diff -- Maybe better to just call `close` if `this` is visible. --- --
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185187424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- This line is effectively no-op unless we exit the loop afterwards. So better to clarify the behavior and fix it. I know this code block is just same as of now so it might be out of topic. If we would like to address it from other issue, I'm happy to file an issue and also work on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185194384 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunna
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185201032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala --- @@ -46,28 +46,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) } -val rdd = query.execute() +val rdd = new ContinuousWriteRDD(query.execute(), writerFactory) +val messages = new Array[WriterCommitMessage](rdd.partitions.length) logInfo(s"Start processing data source writer: $writer. " + - s"The input RDD has ${rdd.getNumPartitions} partitions.") -// Let the epoch coordinator know how many partitions the write RDD has. + s"The input RDD has ${messages.length} partitions.") EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), -sparkContext.env) + sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), + sparkContext.env) .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) try { // Force the RDD to run so continuous processing starts; no data is actually being collected // to the driver, as ContinuousWriteRDD outputs nothing. - sparkContext.runJob( -rdd, -(context: TaskContext, iter: Iterator[InternalRow]) => - WriteToContinuousDataSourceExec.run(writerFactory, context, iter), -rdd.partitions.indices) + rdd.collect() } catch { case _: InterruptedException => -// Interruption is how continuous queries are ended, so accept and ignore the exception. + // Interruption is how continuous queries are ended, so accept and ignore the exception. case cause: Throwable => +logError(s"Data source writer $writer is aborting.") --- End diff -- Could you please explain the needs of additional handling? Since ContinuousWriteRDD is still handling the error case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r18524 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochPollRunnable.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.BlockingQueue +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset + +/** + * The epoch marker component of [[ContinuousQueuedDataReader]]. Populates the queue with + * (null, null) when a new epoch marker arrives. + */ +class EpochPollRunnable( +queue: BlockingQueue[(UnsafeRow, PartitionOffset)], +context: TaskContext, +failedFlag: AtomicBoolean) + extends Thread with Logging { + private[continuous] var failureReason: Throwable = _ + + private val epochEndpoint = EpochCoordinatorRef.get( + context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), SparkEnv.get) + // Note that this is *not* the same as the currentEpoch in [[ContinuousDataQueuedReader]]! That + // field represents the epoch wrt the data being processed. The currentEpoch here is just a + // counter to ensure we send the appropriate number of markers if we fall behind the driver. + private var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong + + override def run(): Unit = { +try { + val newEpoch = epochEndpoint.askSync[Long](GetCurrentEpoch) + for (i <- currentEpoch to newEpoch - 1) { --- End diff -- Please correct me if I'm missing. My understanding is that the situation (gap bigger than 1) should only occur when array queue gets full and blocks epoch thread to put marker more than trigger interval. Any other situations (error cases) should just crash the whole query so that recovery happens from the scratch: that's why we can ignore the missing case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185282844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- I meant current logic still call queue.poll again instead of using assigned epoch marker value, even if it matches the if statement. It looks like unintended, right? We can arrange the logic to fail-fast on exception cases, and if-else to fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185316062 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunna
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185317000 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) --- End diff -- Yeah, that's what I also missed. Thanks for correcting. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185326551 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunna
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r185328820 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunna
[GitHub] spark pull request #21200: [SPARK-24039][SS] Do continuous processing writes...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21200#discussion_r18520 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import java.util.concurrent.TimeUnit +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset} +import org.apache.spark.util.ThreadUtils + +/** + * The bottom-most RDD of a continuous processing read task. Wraps a [[ContinuousQueuedDataReader]] + * to read from the remote source, and polls that queue for incoming rows. + * + * Note that continuous processing calls compute() multiple times, and the same + * [[ContinuousQueuedDataReader]] instance will/must be shared between each call for the same split. + */ +class ContinuousDataSourceRDD( +sc: SparkContext, +sqlContext: SQLContext, +@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]]) + extends RDD[UnsafeRow](sc, Nil) { + + private val dataQueueSize = sqlContext.conf.continuousStreamingExecutorQueueSize + private val epochPollIntervalMs = sqlContext.conf.continuousStreamingExecutorPollIntervalMs + + // When computing the same partition multiple times, we need to use the same data reader to + // do so for continuity in offsets. + @GuardedBy("dataReaders") + private val dataReaders: mutable.Map[Partition, ContinuousQueuedDataReader] = +mutable.Map[Partition, ContinuousQueuedDataReader]() + + override protected def getPartitions: Array[Partition] = { +readerFactories.zipWithIndex.map { + case (readerFactory, index) => new DataSourceRDDPartition(index, readerFactory) +}.toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +// If attempt number isn't 0, this is a task retry, which we don't support. +if (context.attemptNumber() != 0) { + throw new ContinuousTaskRetryException() +} + +val readerForPartition = dataReaders.synchronized { + if (!dataReaders.contains(split)) { +dataReaders.put( + split, + new ContinuousQueuedDataReader(split, context, dataQueueSize, epochPollIntervalMs)) + } + + dataReaders(split) +} + +val coordinatorId = context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY) +val epochEndpoint = EpochCoordinatorRef.get(coordinatorId, SparkEnv.get) +new Iterator[UnsafeRow] { + private val POLL_TIMEOUT_MS = 1000 + + private var currentEntry: (UnsafeRow, PartitionOffset) = _ + + override def hasNext(): Boolean = { +while (currentEntry == null) { + if (context.isInterrupted() || context.isCompleted()) { +currentEntry = (null, null) + } + if (readerForPartition.dataReaderFailed.get()) { +throw new SparkException( + "data read failed", readerForPartition.dataReaderThread.failureReason) + } + if (readerForPartition.epochPollFailed.get()) { +throw new SparkException( + "epoch poll failed", readerForPartition.epochPollRunna
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21222 [SPARK-24161][SS] Enable debug package feature on structured streaming ## What changes were proposed in this pull request? Currently, debug package has a implicit class "DebugQuery" which matches Dataset to provide debug features on Dataset class. It doesn't work with structured streaming: it requires query is already started, and the information can be retrieved from StreamingQuery, not Dataset. I guess that's why "explain" had to be placed to StreamingQuery whereas it already exists on Dataset. This patch adds a new implicit class "DebugStreamQuery" which matches StreamingQuery to provide similar debug features on StreamingQuery class. ## How was this patch tested? Added relevant unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24161 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21222.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21222 commit c1ad1c557e6165455457adb6f148d6d9616548a1 Author: Jungtaek Lim Date: 2018-05-03T02:26:48Z SPARK-24161 Enable debug package feature on structured streaming * added implicit class which adds debug features for StreamingQuery * added unit tests for new functionalities --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 @tdas @jose-torres @jerryshao @arunmahadevan Kindly ping to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21207: SPARK-24136: Fix MemoryStreamDataReader.next to skip sle...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21207 @tdas @jerryshao @HyukjinKwon Kindly ping to trigger test and review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r186032252 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -88,23 +100,62 @@ package object debug { } } + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { +val msg = query match { + case w: StreamExecution if w.lastExecution != null => --- End diff -- Addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r186247474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -116,6 +168,30 @@ package object debug { } } + implicit class DebugStreamQuery(query: StreamingQuery) extends Logging { +def debug(): Unit = { + query match { +case w: StreamExecution => --- End diff -- My bad. Fixed. Also changed the unit tests so that your reported case can be covered in test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21222: [SPARK-24161][SS] Enable debug package feature on struct...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21222 Kindly ping. I guess debugging last batch might not be attractive that much, but printing codegen would be helpful to someone who want to investigate or debug in detail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188636306 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue} +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * Messages for the UnsafeRowReceiver endpoint. Either an incoming row or an epoch marker. + */ +private[shuffle] sealed trait UnsafeRowReceiverMessage extends Serializable +private[shuffle] case class ReceiverRow(row: UnsafeRow) extends UnsafeRowReceiverMessage +private[shuffle] case class ReceiverEpochMarker() extends UnsafeRowReceiverMessage + +/** + * RPC endpoint for receiving rows into a continuous processing shuffle task. + */ +private[shuffle] class UnsafeRowReceiver(val rpcEnv: RpcEnv) +extends ThreadSafeRpcEndpoint with Logging { + private val queue = new ArrayBlockingQueue[UnsafeRowReceiverMessage](1024) --- End diff -- I guess we can handle 2 as TODO if we would like to focus on proposed patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188628202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the --- End diff -- If my understanding is right, bottom will be the RDD which will be just injected before shuffling, so that would be neither reader nor writer. `first` and `last` would be good alternative for me if bottom looks like ambiguous. As @arunmahadevan stated, comment looks like incomplete. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188638980 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.{TaskContext, TaskContextImpl} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{DataType, IntegerType} + +class ContinuousShuffleReadSuite extends StreamTest { + + private def unsafeRow(value: Int) = { +UnsafeProjection.create(Array(IntegerType : DataType))( + new GenericInternalRow(Array(value: Any))) + } + + var ctx: TaskContextImpl = _ + + override def beforeEach(): Unit = { +super.beforeEach() +ctx = TaskContext.empty() +TaskContext.setTaskContext(ctx) + } + + override def afterEach(): Unit = { +ctx.markTaskCompleted(None) +ctx = null +super.afterEach() + } + + test("receiver stopped with row last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("receiver stopped with marker last") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +ctx.markTaskCompleted(None) +val receiver = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].receiver +eventually(timeout(streamingTimeout)) { + assert(receiver.stopped.get()) +} + } + + test("one epoch") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val iter = rdd.compute(rdd.partitions(0), ctx) +assert(iter.next().getInt(0) == 111) +assert(iter.next().getInt(0) == 222) +assert(iter.next().getInt(0) == 333) +assert(!iter.hasNext) + } + + test("multiple epochs") { +val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint +endpoint.askSync[Unit](ReceiverRow(unsafeRow(111))) +endpoint.askSync[Unit](ReceiverEpochMarker()) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(222))) +endpoint.askSync[Unit](ReceiverRow(unsafeRow(333))) +endpoint.askSync[Unit](ReceiverEpochMarker()) + +val firstEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(firstEpoch.next().getInt(0) == 111) +assert(!firstEpoch.hasNext) + +val secondEpoch = rdd.compute(rdd.partitions(0), ctx) +assert(secondEpoch.next().getInt(0) == 222) +assert(secondEpoch.next().getInt(0) == 333) +assert(!secondEpoch.hasNext) + } + + test("empty epochs
[GitHub] spark pull request #21337: [SPARK-24234][SS] Reader for continuous processin...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21337#discussion_r188632188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/ContinuousShuffleReadRDD.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import java.util.UUID + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.NextIterator + +case class ContinuousShuffleReadPartition(index: Int) extends Partition { + // Initialized only on the executor, and only once even as we call compute() multiple times. + lazy val (receiver, endpoint) = { +val env = SparkEnv.get.rpcEnv +val receiver = new UnsafeRowReceiver(env) +val endpoint = env.setupEndpoint(UUID.randomUUID().toString, receiver) +TaskContext.get().addTaskCompletionListener { ctx => + env.stop(endpoint) +} +(receiver, endpoint) + } +} + +/** + * RDD at the bottom of each continuous processing shuffle task, reading from the + */ +class ContinuousShuffleReadRDD(sc: SparkContext, numPartitions: Int) +extends RDD[UnsafeRow](sc, Nil) { + + override protected def getPartitions: Array[Partition] = { +(0 until numPartitions).map(ContinuousShuffleReadPartition).toArray + } + + override def compute(split: Partition, context: TaskContext): Iterator[UnsafeRow] = { +val receiver = split.asInstanceOf[ContinuousShuffleReadPartition].receiver + +new NextIterator[UnsafeRow] { + override def getNext(): UnsafeRow = receiver.poll() match { +case ReceiverRow(r) => r +case ReceiverEpochMarker() => --- End diff -- Does this ensure at-least-once? Then we could start from this, and improve it from another PR as @jose-torres stated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStorePr...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21357 [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider to remove duplicate⦠â¦d logic between operations on delta file and snapshot file ## What changes were proposed in this pull request? This patch refactors HDFSBackedStateStoreProvider to extract duplicated logic between operations on delta file and snapshot file, as well as documenting the structure of state file. ## How was this patch tested? Existing unit tests: StateStoreSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24311 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21357.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21357 commit 8ad2a3f8112662a865ee1dbaf7c5269197c3ee4f Author: Jungtaek Lim Date: 2018-05-17T21:17:30Z SPARK-24311 Refactor HDFSBackedStateStoreProvider to remove duplicated logic between operations on delta file and snapshot file * also removed unused import statements --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21357: [SPARK-24311][SS] Refactor HDFSBackedStateStoreProvider ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21357 cc. @tdas --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21388: [SPARK-24336][SQL] Support 'pass through' transfo...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21388 [SPARK-24336][SQL] Support 'pass through' transformation in BasicOperators ## What changes were proposed in this pull request? Enable 'pass through' transformation in BasicOperators via reflection, so that every pairs of transformation which only requires converting LogicalPlan to SparkPlan via calling `planLater()` can be transformed automatically. It just needs to add the pair of transformation in map. ## How was this patch tested? Unit tests on existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24336 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21388.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21388 commit 4c2f700a5d7944c13681581df7379e9653c5d588 Author: Jungtaek Lim Date: 2018-05-21T23:31:26Z SPARK-24336 Support 'pass through' transformation in BasicOperators --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21388 Thanks @HyukjinKwon for reviewing. Addressed review comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21388 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21388: [SPARK-24336][SQL] Support 'pass through' transformation...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21388 @hvanhovell I also think someone might not want to have reflection magic (I was the one but realized I should do it), so I'm happy to close the PR when others voice same opinion on this too. For me, reflection looks like only way to achieve `Can we automate these 'pass through' operations?`, so if we decide to reject the approach, we might be better to either remove the line, or add description on restriction(s) instead, unless we have another immediate idea to achieve it without reflection. Btw, I'd be very happy if you are happy to spend some time to explain which points make you being concerned about reflection in planner. Maybe adding the description explicitly would avoid the similar trial on contributors and save our time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190129892 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) + private val completion = new ExecutorCompletionService[UnsafeRowReceiverMessage](executor) + + private def completionTask(writerId: Int) = new Callable[UnsafeRowReceiverMessage] { +override def call(): UnsafeRowReceiverMessage = queues(writerId).take() } - override def close(): Unit = {} + // Initialize by submitting tasks to read the first row from each writer. + (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId))) + + /** + * In each call to getNext(), we pull the next row available in the completion queue, and then + * submit another task to read the next row from the writer which returned it. + * + * When a writer sends an epoch marker, we note that it's finished and don't submit another + * task for it in this epoch. The iterator is over once all writers have sent an epoch marker. + */ + override def getNext(): UnsafeRow = { +var nextRow: UnsafeRow = null +while (nextRow == null) { + nextRow = completion.poll(checkpointIntervalMs, TimeUnit.MILLISECONDS) match { +case null => + // Try again if the poll didn't wait long enough to get a real result. + // But we should be getting at least an epoch marker every checkpoint interval. + logWarning( +s"Completion service failed to make progress after $checkpointIntervalMs ms") + null + +// The completion service guarantees this future will be available immediately. +case future => future.get() match { + case ReceiverRow(writerId, r) => +// Start reading the next element in the queue we just took from. +completion.submit(completionTask(writerId)) +r + // TODO use writerId --- End diff -- It looks like to be not needed as of now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190125731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- As I commented earlier in design doc that I was in favor of single queue, because I thought it minimizes the thread count which may avoid unnecessary contention (as well as code complexity in this case), and also defines the condition of backpressure fairly simple (if RPC requests can block infinitely unless queue has room to write). But as I read some articles regarding `multiple writers, single reader on single queue` vs `single writer, single reader on multiple queues per writer`, unless we introduce highly-optimized queue like Disruptor, second approach looks like perform better. So the approach looks great to me for now, and at least we could consider replacing this with adopting queue library when we encounter the bad situation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190120836 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = --- End diff -- The map will always contain `(writerId, true)` which value is not needed at all, and we are only concerned about the writerId which range is 0 until numShuffleWriters, so it might be better to consider alternative as well. Looks like this could be also a Set with pre-initialized to 0 until numShuffleWriters, and we can remove the element when we receive mark. If the element is still in a set, this represents we didn't receive mark from such writer yet. In similar approach, it can be pre-initialized Array of Boolean with value as true/false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21385: [SPARK-24234][SS] Support multiple row writers in...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190131693 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/UnsafeRowReceiver.scala --- @@ -56,20 +69,73 @@ private[shuffle] class UnsafeRowReceiver( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: UnsafeRowReceiverMessage => - queue.put(r) + queues(r.writerId).put(r) context.reply(()) } override def read(): Iterator[UnsafeRow] = { new NextIterator[UnsafeRow] { - override def getNext(): UnsafeRow = queue.take() match { -case ReceiverRow(r) => r -case ReceiverEpochMarker() => - finished = true - null + // An array of flags for whether each writer ID has gotten an epoch marker. + private val writerEpochMarkersReceived = +mutable.Map.empty[Int, Boolean].withDefaultValue(false) + + private val executor = Executors.newFixedThreadPool(numShuffleWriters) --- End diff -- And I'm also now seeing this approach as alternative to deal with alignment (not buffer rows explicitly but just don't read after epoch comes in). Nice approach. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21445 @LiangchangZ Looks like the patch is needed only with #21353 #21332 #21293 as of now, right? If then please state the condition in JIRA issue description as well as PR's description so that we don't get confused. There's a case that reader and writer are composed together in a task (current state of continuous processing), and then after this patch it will be two places which increase epoch for the same thread. Please note that I'm commenting on top of current implementation, not considering #21353 #21332 #21293. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21445: [SPARK-24404][SS] Increase currentEpoch when meet a Epoc...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21445 @LiangchangZ > In the real CP situation, reader and writer may be always in different tasks, right? Continuous mode already supports some valid use cases, and putting all in one task would be fastest in such use cases though tasks can be parallelized by partition. Unless we have valid reason to separate reader and writer even in non-shuffle query, it would be better to keep it as it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191605388 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest { super.afterEach() } - test("receiver stopped with row last") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) -val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint -send( - endpoint, - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)) -) + private implicit def unsafeRow(value: Int) = { --- End diff -- Just curious: is there a reason to rearrange functions, this and below twos? Looks like they're same except changing this function to `implicit`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191629272 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest { val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet assert(thirdEpoch == Set("writer1-row1", "writer2-row0")) } + + test("one epoch") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) + } + + test("multiple epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) +writer.write(Iterator(4, 5, 6)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) +assert(readEpoch(reader) == Seq(4, 5, 6)) + } + + test("empty epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator()) +writer.write(Iterator(1, 2)) +writer.write(Iterator()) +writer.write(Iterator()) +writer.write(Iterator(3, 4)) +writer.write(Iterator()) + +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(1, 2)) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(3, 4)) +assert(readEpoch(reader) == Seq()) + } + + test("blocks waiting for writer") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +val readerEpoch = reader.compute(reader.partitions(0), ctx) + +val readRowThread = new Thread { + override def run(): Unit = { +assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) + } +} +readRowThread.start() + +eventually(timeout(streamingTimeout)) { + assert(readRowThread.getState == Thread.State.TIMED_WAITING) +} + +// Once we write the epoch the thread should stop waiting and succeed. +writer.write(Iterator(1)) +readRowThread.join() + } + + test("multiple writer partitions") { --- End diff -- Would we want to have another test which covers out-of-order epoch between writers (if that's valid case for us), or rely on the test in ContinuousShuffleReadRDD? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191629554 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest { super.afterEach() } - test("receiver stopped with row last") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) -val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint -send( - endpoint, - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)) -) + private implicit def unsafeRow(value: Int) = { --- End diff -- And where it leverages the `implicit` attribute of this method? I'm not sure it is really needed, but I'm review on Github page so I might be missing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total size of states in ...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21469 [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStore⦠â¦Provider ## What changes were proposed in this pull request? This patch exposes the estimation of size of cache (loadedMaps) in HDFSBackedStateStoreProvider as a custom metric of StateStore. While it refers loadedMaps directly, there would be only one StateStoreWriter which refers a StateStoreProvider, so the value is not exposed as well as being aggregated multiple times. Current state metrics are safe to aggregate for the same reason. ## How was this patch tested? Tested manually. Below is the snapshot of UI page which is reflected by the patch: https://user-images.githubusercontent.com/1317309/40788976-4ad93d8c-652c-11e8-88f1-337be5162588.png";> You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24441 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21469.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21469 commit dc11338650842a246a4bce9280d130607ceca281 Author: Jungtaek Lim Date: 2018-05-31T14:38:00Z [SPARK-24441][SS] Expose total size of states in HDFSBackedStateStoreProvider * expose estimation of size of cache (loadMaps) in HDFSBackedStateStoreProvider * as a custom metric of StateStore --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 cc. @tdas @jose-torres @jerryshao @HyukjinKwon @arunmahadevan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Thanks @HyukjinKwon for reviewing. Addressed PR title as well as fixing nit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @jose-torres Ah yes I forgot that shallow copy has been occurring, so while new map should hold necessary size of map entries but row object will be shared across versions. Thanks for pointing it out. Will update the description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @arunmahadevan I didn't add the metric to StateOperatorProgress cause this behavior is specific to HDFSBackedStateStoreProvider (though this is only one implementation available in Apache Spark) so not sure this metric can be treated as a general one. (@tdas what do you think about this?) Btw, the cache is going to clean up when maintenance operation is in progress, so there could be more than 100 versions in map. Not sure why it shows 150x, but I couldn't find missing spot on the patch. Maybe the issue is from SizeEstimator.estimate()? One thing we need to check is how SizeEstimator.estimate() calculate the memory usage when Unsafe row objects are shared across versions. If SizeEstimator adds the size of object whenever it is referenced, it will report much higher memory usage than actual. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21477 Thanks @HyukjinKwon for cc.ing me. I didn't cover the python part on structured streaming so would take some time to cover and going through the code. Hoping I can participate reviewing in time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Looks like the size is added only once for same identity on SizeEstimator.estimate(), so SizeEstimator.estimate() is working correctly in this case. There might be other valid cases, but not sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21497 [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be compatible with netcat again ## What changes were proposed in this pull request? TextSocketMicroBatchReader was no longer be compatible with netcat due to launching temporary reader for reading schema, and closing reader, and re-opening reader. While reliable socket server should be able to handle this without any issue, nc command normally can't handle multiple connections and simply exits when closing temporary reader. This patch fixes TextSocketMicroBatchReader to be compatible with netcat again, via deferring opening socket to the first call of planInputPartitions() instead of constructor. ## How was this patch tested? Added unit test which fails on current and succeeds with the patch. And also manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24466 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21497.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21497 commit 7b875279742fa295ab513cf8a489830237953d0c Author: Jungtaek Lim Date: 2018-06-05T07:57:42Z SPARK-24466 Fix TextSocketMicroBatchReader to be compatible with netcat again --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 Failing tests were below: * org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.NestedSuiteSelector) * org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.NestedSuiteSelector) Test failures are not relevant to the patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Also added custom metric for the count of versions stored in loadedMaps. This is a new screenshot: https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png";> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 cc. @tdas @jose-torres @jerryshao @arunmahadevan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 @arunmahadevan Yes, before the patch Spark connects to socket server twice: one for getting schema, and another one for reading data. And `-k` flag is only supported for specific distribution, and that's why I had to set breakpoint and started nc again after temp reader is stopped. For example, in my local dev. (macOS 10.12.6), netcat doesn't support -k flag. ``` netcat (The GNU Netcat) 0.7.1 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 I agree that current cache approach may consume excessive memory unnecessarily, and that's also same to my finding in #21469. The issue is not that simple however, because in micro-batch mode, each batch should read previous version of state, otherwise it should read from file system, in worst case seeking and reading multiple files in remote file system. So previous version of state is encouraged to be available in memory. There're three cases here (please add if I'm missing here): 1. fail before commit 2. committed but batch failed afterwards 3. committed and batch succeeds. It might be better to think about all the cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193277616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -35,10 +34,11 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset} -import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + --- End diff -- Thanks for letting me know. Addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193289099 --- Diff: python/pyspark/sql/tests.py --- @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self): finally: q.stop() shutil.rmtree(tmpPath) +''' +class ForeachWriterTester: + +def __init__(self, spark): +self.spark = spark +self.input_dir = tempfile.mkdtemp() +self.open_events_dir = tempfile.mkdtemp() +self.process_events_dir = tempfile.mkdtemp() +self.close_events_dir = tempfile.mkdtemp() + +def write_open_event(self, partitionId, epochId): +self._write_event( +self.open_events_dir, +{'partition': partitionId, 'epoch': epochId}) + +def write_process_event(self, row): +self._write_event(self.process_events_dir, {'value': 'text'}) + +def write_close_event(self, error): +self._write_event(self.close_events_dir, {'error': str(error)}) + +def write_input_file(self): +self._write_event(self.input_dir, "text") + +def open_events(self): +return self._read_events(self.open_events_dir, 'partition INT, epoch INT') + +def process_events(self): +return self._read_events(self.process_events_dir, 'value STRING') + +def close_events(self): +return self._read_events(self.close_events_dir, 'error STRING') + +def run_streaming_query_on_writer(self, writer, num_files): +try: +sdf = self.spark.readStream.format('text').load(self.input_dir) +sq = sdf.writeStream.foreach(writer).start() +for i in range(num_files): +self.write_input_file() +sq.processAllAvailable() +sq.stop() +finally: +self.stop_all() + +def _read_events(self, dir, json): +rows = self.spark.read.schema(json).json(dir).collect() +dicts = [row.asDict() for row in rows] +return dicts + +def _write_event(self, dir, event): +import random +file = open(os.path.join(dir, str(random.randint(0, 10))), 'w') +file.write("%s\n" % str(event)) +file.close() + +def stop_all(self): +for q in self.spark._wrapped.streams.active: +q.stop() + +def __getstate__(self): +return (self.open_events_dir, self.process_events_dir, self.close_events_dir) + +def __setstate__(self, state): +self.open_events_dir, self.process_events_dir, self.close_events_dir = state + +def test_streaming_foreach_with_simple_function(self): +tester = self.ForeachWriterTester(self.spark) + +def foreach_func(row): +tester.write_process_event(row) + +tester.run_streaming_query_on_writer(foreach_func, 2) +self.assertEqual(len(tester.process_events()), 2) + +def test_streaming_foreach_with_basic_open_process_close(self): +tester = self.ForeachWriterTester(self.spark) + +class ForeachWriter: +def open(self, partitionId, epochId): +tester.write_open_event(partitionId, epochId) +return True + +def process(self, row): +tester.write_process_event(row) + +def close(self, error): +tester.write_close_event(error) + +tester.run_streaming_query_on_writer(ForeachWriter(), 2) + +open_events = tester.open_events() +self.assertEqual(len(open_events), 2) +self.assertSetEqual(set([e['epoch'] for e in open_events]), {0, 1}) + +self.assertEqual(len(tester.process_events()), 2) + +close_events = tester.close_events() +self.assertEqual(len(close_events), 2) +self.assertSetEqual(set([e['error'] for e in close_events]), {'None'}) + +def test_streaming_foreach_with_open_returning_false(self): +tester = self.ForeachWriterTester(self.spark) + +class ForeachWriter: +def open(self, partitionId, epochId): +tester.write_open_event(partitionId, epochId) +
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193284293 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly --- End diff -- nit: deserialized` `copy (space) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193289567 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala --- @@ -20,10 +20,48 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability /** - * A class to consume data generated by a `StreamingQuery`. Typically this is used to send the - * generated data to external systems. Each partition will use a new deserialized instance, so you - * usually should do all the initialization (e.g. opening a connection or initiating a transaction) - * in the `open` method. + * The abstract class for writing custom logic to process data generated by a query. + * This is often used to write the output of a streaming query to arbitrary storage systems. --- End diff -- Looks like doc is duplicated between `foreach()` and `ForeachWriter`. I'm not sure how we can leave some reference on Python doc instead of duplicating content, but even Python doc doesn't support some kind of reference, some part of content seems to be OK to be placed to either place, not both. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193291809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonForeachWriter.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.File +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.api.python._ +import org.apache.spark.internal.Logging +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.sql.ForeachWriter +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{NextIterator, Utils} + +class PythonForeachWriter(func: PythonFunction, schema: StructType) + extends ForeachWriter[UnsafeRow] { + + private lazy val context = TaskContext.get() + private lazy val buffer = new PythonForeachWriter.UnsafeRowBuffer( +context.taskMemoryManager, new File(Utils.getLocalDir(SparkEnv.get.conf)), schema.fields.length) + private lazy val inputRowIterator = buffer.iterator + + private lazy val inputByteIterator = { +EvaluatePython.registerPicklers() +val objIterator = inputRowIterator.map { row => EvaluatePython.toJava(row, schema) } +new SerDeUtil.AutoBatchedPickler(objIterator) + } + + private lazy val pythonRunner = { +val conf = SparkEnv.get.conf +val bufferSize = conf.getInt("spark.buffer.size", 65536) +val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) +PythonRunner(func, bufferSize, reuseWorker) + } + + private lazy val outputIterator = +pythonRunner.compute(inputByteIterator, context.partitionId(), context) + + override def open(partitionId: Long, version: Long): Boolean = { +outputIterator // initialize everything +TaskContext.get.addTaskCompletionListener { _ => buffer.close() } +true + } + + override def process(value: UnsafeRow): Unit = { +buffer.add(value) + } + + override def close(errorOrNull: Throwable): Unit = { +buffer.allRowsAdded() +if (outputIterator.hasNext) outputIterator.next() // to throw python exception if there was one + } +} + +object PythonForeachWriter { + + /** + * A buffer that is designed for the sole purpose of buffering UnsafeRows in PythonForeahWriter. --- End diff -- nit: PythonForeachWriter --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193286066 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193286932 --- Diff: python/pyspark/sql/tests.py --- @@ -1884,7 +1885,164 @@ def test_query_manager_await_termination(self): finally: q.stop() shutil.rmtree(tmpPath) +''' +class ForeachWriterTester: + +def __init__(self, spark): +self.spark = spark +self.input_dir = tempfile.mkdtemp() +self.open_events_dir = tempfile.mkdtemp() +self.process_events_dir = tempfile.mkdtemp() +self.close_events_dir = tempfile.mkdtemp() + +def write_open_event(self, partitionId, epochId): +self._write_event( +self.open_events_dir, +{'partition': partitionId, 'epoch': epochId}) + +def write_process_event(self, row): +self._write_event(self.process_events_dir, {'value': 'text'}) + +def write_close_event(self, error): +self._write_event(self.close_events_dir, {'error': str(error)}) + +def write_input_file(self): +self._write_event(self.input_dir, "text") + +def open_events(self): +return self._read_events(self.open_events_dir, 'partition INT, epoch INT') + +def process_events(self): +return self._read_events(self.process_events_dir, 'value STRING') + +def close_events(self): +return self._read_events(self.close_events_dir, 'error STRING') + +def run_streaming_query_on_writer(self, writer, num_files): +try: +sdf = self.spark.readStream.format('text').load(self.input_dir) +sq = sdf.writeStream.foreach(writer).start() +for i in range(num_files): +self.write_input_file() +sq.processAllAvailable() +sq.stop() +finally: +self.stop_all() + +def _read_events(self, dir, json): +rows = self.spark.read.schema(json).json(dir).collect() +dicts = [row.asDict() for row in rows] +return dicts + +def _write_event(self, dir, event): +import random +file = open(os.path.join(dir, str(random.randint(0, 10))), 'w') --- End diff -- We might feel more convenient with `with` statement, and renaming `file` to `f` or `fw` or so. Please ignore if there's specific reason not to use `with` statement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193284839 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a --- End diff -- > any initialization for writing data (e.g. opening a connection or starting a transaction) be done open after the `open(...)` method has been called `be done open` seems a bit odd. If we can polish the sentence it would be better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193285667 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193304316 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala --- @@ -20,10 +20,48 @@ package org.apache.spark.sql import org.apache.spark.annotation.InterfaceStability /** - * A class to consume data generated by a `StreamingQuery`. Typically this is used to send the - * generated data to external systems. Each partition will use a new deserialized instance, so you - * usually should do all the initialization (e.g. opening a connection or initiating a transaction) - * in the `open` method. + * The abstract class for writing custom logic to process data generated by a query. + * This is often used to write the output of a streaming query to arbitrary storage systems. --- End diff -- Ah yes my bad. I confused this as python. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193372564 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("verify ServerThread only accepts the first connection") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { --- End diff -- Yeah actually I blindly copied the code line in the file. Agreed it would be better to use the key. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21497#discussion_r193374662 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala --- @@ -256,6 +246,66 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("verify ServerThread only accepts the first connection") { +serverThread = new ServerThread() +serverThread.start() + +withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { --- End diff -- Thanks for guiding, addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @arunmahadevan Added custom metrics in state store to streaming query status as well. You can see `providerLoadedMapSize` is added to `stateOperators.customMetrics` in below output. I have to exclude `providerLoadedMapCountOfVersions` from the list, since average metric is implemented a bit tricky and doesn't look like easy to aggregate for streaming query status. We may want to reimplement SQLMetric and subclasses to make sure everything works correctly without any tricky approach, but that doesn't look like trivial to address and I think this is out of scope on this PR. ``` 18/06/06 22:51:23 INFO MicroBatchExecution: Streaming query made progress: { "id" : "7564a0b7-e3b2-4d53-b246-b774ab04e586", "runId" : "8dd34784-080c-4f86-afaf-ac089902252d", "name" : null, "timestamp" : "2018-06-06T13:51:15.467Z", "batchId" : 4, "numInputRows" : 547, "inputRowsPerSecond" : 67.15776550030694, "processedRowsPerSecond" : 65.94333936106088, "durationMs" : { "addBatch" : 7944, "getBatch" : 1, "getEndOffset" : 0, "queryPlanning" : 61, "setOffsetRange" : 5, "triggerExecution" : 8295, "walCommit" : 158 }, "eventTime" : { "avg" : "2018-06-06T13:51:10.313Z", "max" : "2018-06-06T13:51:14.250Z", "min" : "2018-06-06T13:51:07.098Z", "watermark" : "2018-06-06T13:50:36.676Z" }, "stateOperators" : [ { "numRowsTotal" : 20, "numRowsUpdated" : 16, "memoryUsedBytes" : 26679, "customMetrics" : { "providerLoadedMapSize" : 181911 } } ], "sources" : [ { "description" : "KafkaV2[Subscribe[apachelogs-v2]]", "startOffset" : { "apachelogs-v2" : { "2" : 489056, "4" : 489053, "1" : 489055, "3" : 489051, "0" : 489053 } }, "endOffset" : { "apachelogs-v2" : { "2" : 489056, "4" : 489053, "1" : 489055, "3" : 489051, "0" : 489053 } }, "numInputRows" : 547, "inputRowsPerSecond" : 67.15776550030694, "processedRowsPerSecond" : 65.94333936106088 } ], "sink" : { "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@60999714" } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total estimated size of ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r193622940 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala --- @@ -231,7 +231,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("event ordering") { val listener = new EventCollector withListenerAdded(listener) { - for (i <- 1 to 100) { + for (i <- 1 to 50) { --- End diff -- After the patch this test starts failing: it just means there's more time needed to run this loop 100 times, and doesn't mean the logic is broken. Decreasing number works for me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @TomaszGaweda @aalobaidi Please correct me if I'm missing here. From every start of batch, state store loads previous version of state so that it can be read and written. If we unload all the version "after committing" the cache will no longer contain previous version of state and it will try to load the state via reading files, adding huge latency on starting batch. That's why I stated about three cases before to avoid loading state from files when starting a new batch. Please apply #21469 manually and see how much HDFSBackedStateStoreProvider consumes memory due to storing multiple versions (it will show the state size on the latest version as well as overall state size in cache). Please also observe and provide numbers of latency to show how much it is and how much it will be after the patch. We always have to ask ourselves that we are addressing the issue correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 Retaining versions of state is also relevant to do snapshotting the last version in files: HDFSBackedStateStoreProvider doesn't snapshot if the version doesn't exist in loadedMaps. So we may want to check whether this option also works with current approach of snapshotting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21477#discussion_r193740695 --- Diff: python/pyspark/sql/streaming.py --- @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, continuous=None): self._jwrite = self._jwrite.trigger(jTrigger) return self +def foreach(self, f): +""" +Sets the output of the streaming query to be processed using the provided writer ``f``. +This is often used to write the output of a streaming query to arbitrary storage systems. +The processing logic can be specified in two ways. + +#. A **function** that takes a row as input. +This is a simple way to express your processing logic. Note that this does +not allow you to deduplicate generated data when failures cause reprocessing of +some input data. That would require you to specify the processing logic in the next +way. + +#. An **object** with a ``process`` method and optional ``open`` and ``close`` methods. +The object can have the following methods. + +* ``open(partition_id, epoch_id)``: *Optional* method that initializes the processing +(for example, open a connection, start a transaction, etc). Additionally, you can +use the `partition_id` and `epoch_id` to deduplicate regenerated data +(discussed later). + +* ``process(row)``: *Non-optional* method that processes each :class:`Row`. + +* ``close(error)``: *Optional* method that finalizes and cleans up (for example, +close connection, commit transaction, etc.) after all rows have been processed. + +The object will be used by Spark in the following way. + +* A single copy of this object is responsible of all the data generated by a +single task in a query. In other words, one instance is responsible for +processing one partition of the data generated in a distributed manner. + +* This object must be serializable because each task will get a fresh +serialized-deserializedcopy of the provided object. Hence, it is strongly +recommended that any initialization for writing data (e.g. opening a +connection or starting a transaction) be done open after the `open(...)` +method has been called, which signifies that the task is ready to generate data. + +* The lifecycle of the methods are as follows. + +For each partition with ``partition_id``: + +... For each batch/epoch of streaming data with ``epoch_id``: + +... Method ``open(partitionId, epochId)`` is called. + +... If ``open(...)`` returns true, for each row in the partition and +batch/epoch, method ``process(row)`` is called. + +... Method ``close(errorOrNull)`` is called with error (if any) seen while +processing rows. + +Important points to note: + +* The `partitionId` and `epochId` can be used to deduplicate generated data when +failures cause reprocessing of some input data. This depends on the execution +mode of the query. If the streaming query is being executed in the micro-batch +mode, then every partition represented by a unique tuple (partition_id, epoch_id) +is guaranteed to have the same data. Hence, (partition_id, epoch_id) can be used +to deduplicate and/or transactionally commit data and achieve exactly-once +guarantees. However, if the streaming query is being executed in the continuous +mode, then this guarantee does not hold and therefore should not be used for +deduplication. + +* The ``close()`` method (if exists) is will be called if `open()` method exists and +returns successfully (irrespective of the return value), except if the Python +crashes in the middle. + +.. note:: Evolving. + +>>> # Print every row using a function +>>> writer = sdf.writeStream.foreach(lambda x: print(x)) +>>> # Print every row using a object with process() method +>>> class RowPrinter: +... def open(self, partition_id, epoch_id): +... print("Opened %d, %d" % (
[GitHub] spark pull request #21506: [SPARK-24485][SS] Measure and log elapsed time fo...
GitHub user HeartSaVioR opened a pull request: https://github.com/apache/spark/pull/21506 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HeartSaVioR/spark SPARK-24485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21506.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21506 commit d84f98fc978262f4165f78b3b223b8bb3151f735 Author: Jungtaek Lim Date: 2018-06-07T14:14:46Z [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 There're plenty of other debug messages which might hide the log messages added from this patch. Would we want to log them with INFO instead of DEBUG? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi You can also merge #21506 (maybe with changing log level or modify the patch to set message to INFO level) and see latencies on loading state, snapshotting, cleaning up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi One thing you may want to be aware is that in point of executor's view, executor must load at least 1 version of state in memory regardless of caching versions. I guess you may get better result if you unload entire cache but leaving the last version you just committed. Cache miss will occur for one of three cases `2. committed but batch failed afterwards` but it will happen rarely and still better than cache miss from two of three cases (2 and 3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21504: [SPARK-24479][SS] Added config for registering st...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21504#discussion_r193945288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -55,6 +57,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null + try { +sparkSession.sparkContext.conf.get(STREAMING_QUERY_LISTENERS).foreach { classNames => + Utils.loadExtensions(classOf[StreamingQueryListener], classNames, +sparkSession.sparkContext.conf).foreach(listener => { +addListener(listener) +logInfo(s"Registered listener ${listener.getClass.getName}") --- End diff -- Either debug or info is fine for me, since it would add just couple of log lines only once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21504 retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21497: [SPARK-24466][SS] Fix TextSocketMicroBatchReader to be c...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21497 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21504: [SPARK-24479][SS] Added config for registering streaming...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21504 Test failures were from kafka. retest this, please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21500: Scalable Memory option for HDFSBackedStateStore
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21500 @aalobaidi When starting batch, latest version state is being read to start a new version of state. If the state should be restored from snapshot as well as delta files, it will incur huge latency on restoring. #21506 logs messages when loading state requires dealing with (remote) filesystem. That's why I suggest to merge my patch and run your case again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total estimated size of states ...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 @jose-torres No problem. I expect there would be some inactive moment in Spark community during spark summit. Addressed comment regarding renaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21506: [SPARK-24485][SS] Measure and log elapsed time for files...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21506 cc. @tdas @jose-torres @jerryshao @arunmahadevan @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org