Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/1291#discussion_r14749902
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala ---
@@ -288,23 +288,33 @@ abstract class DStream[T: ClassTag] (
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
- compute(time) match {
- case Some(newRDD) =>
- if (storageLevel != StorageLevel.NONE) {
- newRDD.persist(storageLevel)
- logInfo("Persisting RDD " + newRDD.id + " for time " +
- time + " to " + storageLevel + " at time " + time)
- }
- if (checkpointDuration != null &&
- (time - zeroTime).isMultipleOf(checkpointDuration)) {
- newRDD.checkpoint()
- logInfo("Marking RDD " + newRDD.id + " for time " + time +
- " for checkpointing at time " + time)
- }
- generatedRDDs.put(time, newRDD)
- Some(newRDD)
- case None =>
+ try {
+ compute(time) match {
+ case Some(newRDD) =>
+ if(newRDD.partitions.size==0){
+ None
+ }else{
+ if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting RDD " + newRDD.id + " for time " +
+ time + " to " + storageLevel + " at time " + time)
+ }
+ if (checkpointDuration != null &&
+ (time - zeroTime).isMultipleOf(checkpointDuration)) {
+ newRDD.checkpoint()
+ logInfo("Marking RDD " + newRDD.id + " for time " +
time +
+ " for checkpointing at time " + time)
+ }
+ generatedRDDs.put(time, newRDD)
+ Some(newRDD)
+ }
+ case None =>
+ None
+ }
+ } catch {
+ case e: NullPointerException =>{
--- End diff --
Here white space too.
Also shall we wrap with try-catch here, which line will throw NPE ?
I don't think catch NPE is a good choice here, NPE might be introduce by
unknown bug, we should fix it, not catch it in runtime.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---