[GitHub] spark pull request #19824: [SPARK][STREAMING] Invoke onBatchCompletion() onl...

2017-12-03 Thread victor-wong
Github user victor-wong closed the pull request at:

https://github.com/apache/spark/pull/19824


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...

2017-12-03 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/19824
  
@CodingCat 
Thank you:)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...

2017-12-01 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/19824
  
@CodingCat 

> One thing to note is that mute an event is a behavior change
I agree with that, so we should be careful about changing the current 
behavior. I will close the PR later.

> the remaining discussion becomes “what does complete mean in 
English?” which is not interesting to me to discuss
Maybe the remaining discussion should be how to let the user know that he 
will get a StreamingListenerBatchCompleted event even if the batch failed. 
What about adding some comments:
`
 /** Called when processing of a batch of jobs has completed **(event if 
the batch failed)**. */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
`
As I was using the StreamingListenerBatchCompleted to do some metadata 
checkpointing stuff, which should be done only when the batch succeeded.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...

2017-12-01 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/19824
  
@CodingCat please checkout the difference between the two PR.

`
 if (jobSet.hasCompleted) {
 -  jobSets.remove(jobSet.time)
 -  jobGenerator.onBatchCompletion(jobSet.time)
 -  logInfo("Total delay: %.3f s for time %s (execution: %.3f 
s)".format(
 -jobSet.totalDelay / 1000.0, jobSet.time.toString,
 -jobSet.processingDelay / 1000.0
 -  ))

listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
  }
`
As shown above, in In https://github.com/apache/spark/pull/16542, if a Job 
failed, listenerBus still post a StreamingListenerBatchCompleted, which I 
believe to be incorrect, because the Batch is not completed (a Job of it has 
failed).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...

2017-11-30 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/19824
  
@CodingCat Yes, this PR wants to solve the same issue in 
https://github.com/apache/spark/pull/16542, but I think this is a better way to 
solve it. 
If a Job failed, I think we should not remove it from its JobSet, so 
`jobSet.hasCompleted` will return false. As a result, we will not receive a 
StreamingListenerBatchCompleted.
What I want to say is that if a Job is failed, we should consider the Batch 
as not completed.
I am not confident about my English, if I am not describing it clear, 
please let me know.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19824: [SPARK][STREAMING] Invoke onBatchCompletion() only when ...

2017-11-30 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/19824
  
@viirya Sorry for the misleading title, I have changed it now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19824: Revert "[SPARK-18905][STREAMING] Fix the issue of...

2017-11-27 Thread victor-wong
GitHub user victor-wong opened a pull request:

https://github.com/apache/spark/pull/19824

Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset 
from JobScheduler.jobSets"

## What changes were proposed in this pull request?

The code changes in PR(https://github.com/apache/spark/pull/16542) make me 
very confusing:

https://github.com/apache/spark/blob/5a02e3a2ac8a25d92d98d3b3b0d1173dddb9cc91/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala#L203

`
private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
job.setEndTime(completedTime)

listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
logInfo("Finished job " + job.id + " from job set of time " + 
jobSet.time)
if (jobSet.hasCompleted) {
  listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
  case Failure(e) =>
reportError("Error running job " + job, e)
  case _ =>
if (jobSet.hasCompleted) {
  jobSets.remove(jobSet.time)
  jobGenerator.onBatchCompletion(jobSet.time)
  logInfo("Total delay: %.3f s for time %s (execution: %.3f 
s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
  ))
}
}
  }
`

If a Job failed and the JobSet containing it has completed, listenerBus 
will post a StreamingListenerBatchCompleted, while jobGenerator will not invoke 
onBatchCompletion. So the batch is completed or not ?

The key point is if a Job in a Batch failed, whether or not we consider the 
Batch as completed.

I think if someone register a listener on StreamingListenerBatchCompleted, 
he just wants to get notified only when the batch finishes with no error. So if 
a Job is failed, we should not remove it from its JobSet, thus the JobSet has 
not completed. 

## How was this patch tested?

existing tests

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/victor-wong/spark revert-job-completion

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19824.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19824


commit aafe7b62f80ff1e86f6c528711d24e7de54536c5
Author: wangjiasheng 
Date:   2017-11-27T08:28:48Z

Revert "[SPARK-18905][STREAMING] Fix the issue of removing a failed jobset 
from JobScheduler.jobSets"

This reverts commit f8db8945f25cb884278ff8841bac5f6f28f0dec6.

commit e4e57cca9b0d21db8ad6292f8fcbde2dd316d7b7
Author: wangjiasheng 
Date:   2017-11-27T08:31:24Z

[SPARK][STREAMING] Invoke onBatchCompletion() only when all jobs in the 
JobSet are Success




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17782: Reload credentials file config when app starts with chec...

2017-05-10 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/17782
  
@srowen 
Thanks for replying. I tested with master branch and it turned out the 
issue still existed.
I create a new PR against master branch, 
https://github.com/apache/spark/pull/17937.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17937: Reload credentials file config when app starts with chec...

2017-05-10 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/17937
  
Comments on last PR, https://github.com/apache/spark/pull/17782.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17782: Reload credentials file config when app starts wi...

2017-05-10 Thread victor-wong
Github user victor-wong closed the pull request at:

https://github.com/apache/spark/pull/17782


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17937: Reload credentials file config when app starts wi...

2017-05-10 Thread victor-wong
GitHub user victor-wong opened a pull request:

https://github.com/apache/spark/pull/17937

Reload credentials file config when app starts with checkpoint file i…

## What changes were proposed in this pull request?

Currently credentials file configuration is recovered from checkpoint file 
when Spark Streaming applicatioin is restarted, which will lead to some 
unwanted behaviors, for example:

1. Submit Spark Streaming application using keytab file with checkpoint 
enabled in yarn-cluster mode.

> spark-submit --master yarn-cluster --principal  --keytab xxx ...

2. Stop Spark Streaming application;
3. Resubmit this application after a period of time (i.e. one day);
4. Credentials file configuration recover from checkpoint file, so value of 
 "spark.yarn.credentials.file" points to old staging directory (i.e. 
hdfs:///.sparkStaging/application_/credentials-, application_ 
is the application id of the previous application which was stopped.)
4. When launching executor, ExecutorDelegationTokenUpdater will update 
credentials from credentials file immediately. As credentials file was 
generated one day ago (maybe older), it has already expired, so after a period 
of time the executor keeps failing.

Some useful logs are shown below :

>2017-04-27,15:08:08,098 INFO 
org.apache.spark.executor.CoarseGrainedExecutorBackend: Will periodically 
update credentials from: hdfs:///application_/credentials-
>2017-04-27,15:08:12,519 INFO 
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Reading new 
delegation tokens from hdfs:///application_1/credentials--xx
>2017-04-27,15:08:12,661 INFO 
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Tokens updated 
from credentials file.
...
>2017-04-27,15:08:48,156 WARN org.apache.hadoop.ipc.Client: Exception 
encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token  for xx) can't be found in cache



## How was this patch tested?

manual tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/victor-wong/spark fix-credential-file

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17937


commit fac97c69b8087fda62b776384539301df0230ae2
Author: jiasheng.wang 
Date:   2017-05-10T09:35:11Z

Reload credentials file config when app starts with checkpoint file in 
cluster mode




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17782: Reload credentials file config when app starts with chec...

2017-05-09 Thread victor-wong
Github user victor-wong commented on the issue:

https://github.com/apache/spark/pull/17782
  
@jerryshao 
Sorry for replying late. This issue does exist in master branch as I 
described in https://issues.apache.org/jira/browse/SPARK-19688.
What should I do to elaborate more about this issue? Should I submit a new 
PR for master branch?
Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17782: Reload credentials file config when app starts wi...

2017-04-27 Thread victor-wong
GitHub user victor-wong opened a pull request:

https://github.com/apache/spark/pull/17782

Reload credentials file config when app starts with checkpoint file i…


## What changes were proposed in this pull request?

Currently credentials file configuration is recovered from checkpoint file 
when Spark Streaming applicatioin is restarted, which will lead to some 
unwanted behaviors, for example:

1. Submit Spark Streaming application using keytab file with checkpoint 
enabled in yarn-cluster mode.

> spark-submit --master yarn-cluster --principal  --keytab xxx ...

2. Stop Spark Streaming application;
3. Resubmit this application after a period of time (i.e. one day);
4. Credentials file configuration recover from checkpoint file, so value of 
 "spark.yarn.credentials.file" points to old staging directory (i.e. 
hdfs:///.sparkStaging/application_/credentials-, application_ 
is the application id of the previous application which was stopped.)
4. When launching executor, ExecutorDelegationTokenUpdater will update 
credentials from credentials file immediately. As credentials file was 
generated one day ago (maybe older), it has already expired, so after a period 
of time the executor keeps failing.

Some useful logs are shown below :

>2017-04-27,15:08:08,098 INFO 
org.apache.spark.executor.CoarseGrainedExecutorBackend: Will periodically 
update credentials from: hdfs:///application_/credentials-
>2017-04-27,15:08:12,519 INFO 
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Reading new 
delegation tokens from hdfs:///application_1/credentials--xx
>2017-04-27,15:08:12,661 INFO 
org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater: Tokens updated 
from credentials file.
...
>2017-04-27,15:08:48,156 WARN org.apache.hadoop.ipc.Client: Exception 
encountered while connecting to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token  for xx) can't be found in cache



## How was this patch tested?

manual tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/victor-wong/spark github-patch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17782.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17782


commit 34ce0bab0f5fbdb25efc75dbe4acc412f4bdd791
Author: jiasheng.wang 
Date:   2017-04-27T06:42:12Z

Reload credentials file config when app starts with checkpoint file in 
cluster mode




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org