bq. aggregationMap.put(countryCode,requestCountPerCountry+1);

If NPE came from the above line, maybe requestCountPerCountry was null ?

Cheers

On Thu, Aug 6, 2015 at 8:54 AM, UMESH CHAUDHARY <umesh9...@gmail.com> wrote:

> Scenario is:
>
>    - I have a map of country-code as key and count as value (initially
>    count is 0)
>    - In DStream.foreachRDD I need to update the count for country in the
>    map with new aggregated value
>
> I am doing :
>
> transient Map<String,Integer> aggregationMap=new 
> ConcurrentHashMap<String,Integer>();
>
>
> Integer requestCountPerCountry=aggregationMap.get(countryCode);
>
> aggregationMap.put(countryCode,requestCountPerCountry+1);   // Getting 
> Following Error in this Line
>
>
> java.lang.NullPointerException
>       at JavaKafkaStream$2.call(JavaKafkaStream.java:107)
>       at JavaKafkaStream$2.call(JavaKafkaStream.java:92)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
>       at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:323)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>       at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>       at scala.util.Try$.apply(Try.scala:161)
>       at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>
>
>  Is this issue related to :
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> If so how can I resolve this?
>
>

Reply via email to