[GitHub] spark pull request #19824: [SPARK][STREAMING] Invoke onBatchCompletion() onl...
Github user victor-wong closed the pull request at: https://github.com/apache/spark/pull/19824 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/19824 @CodingCat Thank you:) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/19824 @CodingCat > One thing to note is that mute an event is a behavior change I agree with that, so we should be careful about changing the current behavior. I will close the PR later. > the remaining discussion becomes âwhat does complete mean in English?â which is not interesting to me to discuss Maybe the remaining discussion should be how to let the user know that he will get a StreamingListenerBatchCompleted event even if the batch failed. What about adding some comments: ` /** Called when processing of a batch of jobs has completed **(event if the batch failed)**. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } ` As I was using the StreamingListenerBatchCompleted to do some metadata checkpointing stuff, which should be done only when the batch succeeded. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/19824 @CodingCat please checkout the difference between the two PR. ` if (jobSet.hasCompleted) { - jobSets.remove(jobSet.time) - jobGenerator.onBatchCompletion(jobSet.time) - logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( -jobSet.totalDelay / 1000.0, jobSet.time.toString, -jobSet.processingDelay / 1000.0 - )) listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } ` As shown above, in In https://github.com/apache/spark/pull/16542, if a Job failed, listenerBus still post a StreamingListenerBatchCompleted, which I believe to be incorrect, because the Batch is not completed (a Job of it has failed). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/19824 @CodingCat Yes, this PR wants to solve the same issue in https://github.com/apache/spark/pull/16542, but I think this is a better way to solve it. If a Job failed, I think we should not remove it from its JobSet, so `jobSet.hasCompleted` will return false. As a result, we will not receive a StreamingListenerBatchCompleted. What I want to say is that if a Job is failed, we should consider the Batch as not completed. I am not confident about my English, if I am not describing it clear, please let me know. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/19824 @viirya Sorry for the misleading title, I have changed it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19824: Revert "[SPARK-18905][STREAMING] Fix the issue of...
GitHub user victor-wong opened a pull request: https://github.com/apache/spark/pull/19824 Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets" ## What changes were proposed in this pull request? The code changes in PR(https://github.com/apache/spark/pull/16542) make me very confusing: https://github.com/apache/spark/blob/5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203 ` private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } } } ` If a Job failed and the JobSet containing it has completed, listenerBus will post a StreamingListenerBatchCompleted, while jobGenerator will not invoke onBatchCompletion. So the batch is completed or not ? The key point is if a Job in a Batch failed, whether or not we consider the Batch as completed. I think if someone register a listener on StreamingListenerBatchCompleted, he just wants to get notified only when the batch finishes with no error. So if a Job is failed, we should not remove it from its JobSet, thus the JobSet has not completed. ## How was this patch tested? existing tests Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/victor-wong/spark revert-job-completion Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19824.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19824 commit aafe7b62f80ff1e86f6c528711d24e7de54536c5 Author: wangjiasheng Date: 2017-11-27T08:28:48Z Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset from JobScheduler.jobSets" This reverts commit f8db8945f25cb884278ff8841bac5f6f28f0dec6. commit e4e57cca9b0d21db8ad6292f8fcbde2dd316d7b7 Author: wangjiasheng Date: 2017-11-27T08:31:24Z [SPARK][STREAMING] Invoke onBatchCompletion() only when all jobs in the JobSet are Success --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17782: Reload credentials file config when app starts with chec...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/17782 @srowen Thanks for replying. I tested with master branch and it turned out the issue still existed. I create a new PR against master branch, https://github.com/apache/spark/pull/17937. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17937: Reload credentials file config when app starts with chec...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/17937 Comments on last PR, https://github.com/apache/spark/pull/17782. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17782: Reload credentials file config when app starts wi...
Github user victor-wong closed the pull request at: https://github.com/apache/spark/pull/17782 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17937: Reload credentials file config when app starts wi...
GitHub user victor-wong opened a pull request: https://github.com/apache/spark/pull/17937 Reload credentials file config when app starts with checkpoint file i⦠## What changes were proposed in this pull request? Currently credentials file configuration is recovered from checkpoint file when Spark Streaming applicatioin is restarted, which will lead to some unwanted behaviors, for example: 1. Submit Spark Streaming application using keytab file with checkpoint enabled in yarn-cluster mode. > spark-submit --master yarn-cluster --principal --keytab xxx ... 2. Stop Spark Streaming application; 3. Resubmit this application after a period of time (i.e. one day); 4. Credentials file configuration recover from checkpoint file, so value of "spark.yarn.credentials.file" points to old staging directory (i.e. hdfs:///.sparkStaging/application_/credentials-, application_ is the application id of the previous application which was stopped.) 4. When launching executor, ExecutorDelegationTokenUpdater will update credentials from credentials file immediately. As credentials file was generated one day ago (maybe older), it has already expired, so after a period of time the executor keeps failing. Some useful logs are shown below : >2017-04-27,15:08:08,098 INFO org.apache.spark.executor.CoarseGrainedExecutorBackend: Will periodically update credentials from: hdfs:///application_/credentials- >2017-04-27,15:08:12,519 INFO org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Reading new delegation tokens from hdfs:///application_1/credentials--xx >2017-04-27,15:08:12,661 INFO org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Tokens updated from credentials file. ... >2017-04-27,15:08:48,156 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token for xx) can't be found in cache ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/victor-wong/spark fix-credential-file Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17937 commit fac97c69b8087fda62b776384539301df0230ae2 Author: jiasheng.wang Date: 2017-05-10T09:35:11Z Reload credentials file config when app starts with checkpoint file in cluster mode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17782: Reload credentials file config when app starts with chec...
Github user victor-wong commented on the issue: https://github.com/apache/spark/pull/17782 @jerryshao Sorry for replying late. This issue does exist in master branch as I described in https://issues.apache.org/jira/browse/SPARK-19688. What should I do to elaborate more about this issue? Should I submit a new PR for master branch? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17782: Reload credentials file config when app starts wi...
GitHub user victor-wong opened a pull request: https://github.com/apache/spark/pull/17782 Reload credentials file config when app starts with checkpoint file i⦠## What changes were proposed in this pull request? Currently credentials file configuration is recovered from checkpoint file when Spark Streaming applicatioin is restarted, which will lead to some unwanted behaviors, for example: 1. Submit Spark Streaming application using keytab file with checkpoint enabled in yarn-cluster mode. > spark-submit --master yarn-cluster --principal --keytab xxx ... 2. Stop Spark Streaming application; 3. Resubmit this application after a period of time (i.e. one day); 4. Credentials file configuration recover from checkpoint file, so value of "spark.yarn.credentials.file" points to old staging directory (i.e. hdfs:///.sparkStaging/application_/credentials-, application_ is the application id of the previous application which was stopped.) 4. When launching executor, ExecutorDelegationTokenUpdater will update credentials from credentials file immediately. As credentials file was generated one day ago (maybe older), it has already expired, so after a period of time the executor keeps failing. Some useful logs are shown below : >2017-04-27,15:08:08,098 INFO org.apache.spark.executor.CoarseGrainedExecutorBackend: Will periodically update credentials from: hdfs:///application_/credentials- >2017-04-27,15:08:12,519 INFO org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Reading new delegation tokens from hdfs:///application_1/credentials--xx >2017-04-27,15:08:12,661 INFO org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Tokens updated from credentials file. ... >2017-04-27,15:08:48,156 WARN org.apache.hadoop.ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token for xx) can't be found in cache ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/victor-wong/spark github-patch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17782.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17782 commit 34ce0bab0f5fbdb25efc75dbe4acc412f4bdd791 Author: jiasheng.wang Date: 2017-04-27T06:42:12Z Reload credentials file config when app starts with checkpoint file in cluster mode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org