i remember us having issues with joda classes not serializing property and coming out null "on the other side" in tasks
On Thu, Nov 12, 2015 at 10:12 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Even if log4j didn't work, you can still get some clue by wrapping the > following call with try block: > > currentDate = currentDate.plusDays(1) > > catching NPE and rethrowing with an exception that shows the value of > currentDate > > > Cheers > > > On Thu, Nov 12, 2015 at 1:56 AM, Romain Sagean <romain.sag...@hupi.fr> > wrote: > >> I Still can't make the logger work inside a map function. I can use >> "logInfo("")" in the main but not in the function. Anyway I rewrite my >> program to use java.util.Date instead joda time and I don't have NPE >> anymore. >> >> I will stick with this solution for the moment even if I find java Date >> ugly. >> >> Thanks for your help. >> >> 2015-11-11 15:54 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: >> >>> In case you need to adjust log4j properties, see the following thread: >>> >>> >>> http://search-hadoop.com/m/q3RTtJHkzb1t0J66&subj=Re+Spark+Streaming+Log4j+Inside+Eclipse >>> >>> Cheers >>> >>> On Tue, Nov 10, 2015 at 1:28 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> I took a look at >>>> https://github.com/JodaOrg/joda-time/blob/master/src/main/java/org/joda/time/DateTime.java >>>> Looks like the NPE came from line below: >>>> >>>> long instant = getChronology().days().add(getMillis(), days); >>>> Maybe catch the NPE and print out the value of currentDate to see if >>>> there is more clue ? >>>> >>>> Cheers >>>> >>>> On Tue, Nov 10, 2015 at 12:55 PM, Romain Sagean <romain.sag...@hupi.fr> >>>> wrote: >>>> >>>>> see below a more complete version of the code. >>>>> the firstDate (previously minDate) should not be null, I even added an >>>>> extra "filter( _._2 != null)" before the flatMap and the error is still >>>>> there. >>>>> >>>>> What I don't understand is why I have the error on dateSeq.las.plusDays >>>>> and not on dateSeq.last.isBefore (in the condition). >>>>> >>>>> I also tried changing the allDates function to use a while loop but i got >>>>> the same error. >>>>> >>>>> def allDates(dateStart: DateTime, dateEnd: DateTime): Seq[DateTime] = { >>>>> var dateSeq = Seq(dateStart) >>>>> var currentDate = dateStart >>>>> while (currentDate.isBefore(dateEnd)){ >>>>> dateSeq = dateSeq :+ currentDate >>>>> currentDate = currentDate.plusDays(1) >>>>> } >>>>> return dateSeq >>>>> } >>>>> >>>>> val videoAllDates = events.select("player_id", "current_ts") >>>>> .filter("player_id is not null") .filter("current_ts is not null") >>>>> .map( row => (row.getString(0), timestampToDate(row.getString(1)))) >>>>> .filter(r => r._2.isAfter(minimumDate)) .reduceByKey(minDateTime) >>>>> .flatMapValues( firstDate => allDates(firstDate, endDate)) >>>>> >>>>> >>>>> And the stack trace. >>>>> >>>>> 15/11/10 21:10:36 INFO MapOutputTrackerMasterActor: Asked to send map >>>>> output locations for shuffle 2 to sparkexecu...@r610-2.pro.hupi.loc >>>>> :50821 >>>>> 15/11/10 21:10:36 INFO MapOutputTrackerMaster: Size of output statuses >>>>> for shuffle 2 is 695 bytes >>>>> 15/11/10 21:10:36 INFO MapOutputTrackerMasterActor: Asked to send map >>>>> output locations for shuffle 1 to sparkexecu...@r610-2.pro.hupi.loc >>>>> :50821 >>>>> 15/11/10 21:10:36 INFO MapOutputTrackerMaster: Size of output statuses >>>>> for shuffle 1 is 680 bytes >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 206.0 in stage >>>>> 3.0 (TID 798, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 WARN TaskSetManager: Lost task 205.0 in stage 3.0 >>>>> (TID 797, R610-2.pro.hupi.loc): java.lang.NullPointerException >>>>> at org.joda.time.DateTime.plusDays(DateTime.java:1070) >>>>> at Heatmap$.allDates(heatmap.scala:34) >>>>> at Heatmap$$anonfun$12.apply(heatmap.scala:97) >>>>> at Heatmap$$anonfun$12.apply(heatmap.scala:97) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:686) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:685) >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.1 in stage >>>>> 3.0 (TID 799, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Finished task 206.0 in stage >>>>> 3.0 (TID 798) in 13 ms on R610-2.pro.hupi.loc (182/410) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 207.0 in stage >>>>> 3.0 (TID 800, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.1 in stage 3.0 >>>>> (TID 799) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException >>>>> (null) [duplicate 1] >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.2 in stage >>>>> 3.0 (TID 801, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Finished task 207.0 in stage >>>>> 3.0 (TID 800) in 14 ms on R610-2.pro.hupi.loc (183/410) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 208.0 in stage >>>>> 3.0 (TID 802, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.2 in stage 3.0 >>>>> (TID 801) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException >>>>> (null) [duplicate 2] >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 205.3 in stage >>>>> 3.0 (TID 803, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Finished task 208.0 in stage >>>>> 3.0 (TID 802) in 12 ms on R610-2.pro.hupi.loc (184/410) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Starting task 209.0 in stage >>>>> 3.0 (TID 804, R610-2.pro.hupi.loc, PROCESS_LOCAL, 4416 bytes) >>>>> 15/11/10 21:10:36 INFO TaskSetManager: Lost task 205.3 in stage 3.0 >>>>> (TID 803) on executor R610-2.pro.hupi.loc: java.lang.NullPointerException >>>>> (null) [duplicate 3] >>>>> 15/11/10 21:10:36 ERROR TaskSetManager: Task 205 in stage 3.0 failed 4 >>>>> times; aborting job >>>>> 15/11/10 21:10:36 INFO YarnScheduler: Cancelling stage 3 >>>>> 15/11/10 21:10:36 INFO YarnScheduler: Stage 3 was cancelled >>>>> 15/11/10 21:10:36 INFO DAGScheduler: Job 1 failed: saveAsTextFile at >>>>> heatmap.scala:116, took 45,476562 s >>>>> Exception in thread "main" org.apache.spark.SparkException: Job >>>>> aborted due to stage failure: Task 205 in stage 3.0 failed 4 times, most >>>>> recent failure: Lost task 205.3 in stage 3.0 (TID 803, >>>>> R610-2.pro.hupi.loc): java.lang.NullPointerException >>>>> at org.joda.time.DateTime.plusDays(DateTime.java:1070) >>>>> at Heatmap$.allDates(heatmap.scala:34) >>>>> at Heatmap$$anonfun$12.apply(heatmap.scala:97) >>>>> at Heatmap$$anonfun$12.apply(heatmap.scala:97) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:686) >>>>> at >>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$flatMapValues$1$$anonfun$apply$16.apply(PairRDDFunctions.scala:685) >>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>> at >>>>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >>>>> at >>>>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>> at >>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>> at >>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Driver stacktrace: >>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1210) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1199) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1198) >>>>> at >>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>> at >>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1198) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) >>>>> at scala.Option.foreach(Option.scala:236) >>>>> at >>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1400) >>>>> at >>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1361) >>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>> 15/11/10 21:10:36 WARN TaskSetManager: Lost task 209.0 in stage 3.0 >>>>> (TID 804, R610-2.pro.hupi.loc): TaskKilled (killed intentionally) >>>>> SLF4J: Class path contains multiple SLF4J bindings. >>>>> SLF4J: Found binding in >>>>> [jar:file:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: Found binding in >>>>> [jar:file:/opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/jars/avro-tools-1.7.6-cdh5.4.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>>>> explanation. >>>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] >>>>> >>>>> >>>>> 2015-11-10 18:39 GMT+01:00 Ted Yu <yuzhih...@gmail.com>: >>>>> >>>>>> Can you show the stack trace for the NPE ? >>>>>> >>>>>> Which release of Spark are you using ? >>>>>> >>>>>> Cheers >>>>>> >>>>>> On Tue, Nov 10, 2015 at 8:20 AM, romain sagean <romain.sag...@hupi.fr >>>>>> > wrote: >>>>>> >>>>>>> Hi community, >>>>>>> I try to apply the function below during a flatMapValues or a map >>>>>>> but I get a nullPointerException with the plusDays(1). What did I miss ? >>>>>>> >>>>>>> def allDates(dateSeq: Seq[DateTime], dateEnd: DateTime): >>>>>>> Seq[DateTime] = { >>>>>>> if (dateSeq.last.isBefore(dateEnd)){ >>>>>>> allDates(dateSeq:+ dateSeq.last.plusDays(1), dateEnd) >>>>>>> } else { >>>>>>> dateSeq >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> val videoAllDates = .select("player_id", "mindate").flatMapValues( >>>>>>> minDate => allDates(Seq(minDate), endDate)) >>>>>>> >>>>>>> >>>>>>> --------------------------------------------------------------------- >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> *Romain Sagean* >>>>> >>>>> *romain.sag...@hupi.fr <romain.sag...@hupi.fr>* >>>>> >>>>> >>>> >>> >> >> >> -- >> *Romain Sagean* >> >> *romain.sag...@hupi.fr <romain.sag...@hupi.fr>* >> >> >