I don't think Spark Streaming supports multiple streaming context in one
jvm, you cannot use in such way. Instead you could run multiple streaming
applications, since you're using Yarn.

2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com
> <javascript:_e(%7B%7D,'cvml','sai.sai.s...@gmail.com');>> wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ramkumar.c...@gmail.com');>> wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','sai.sai.s...@gmail.com');>> wrote:
>>>
>>>> I just did a local test with your code, seems everything is fine, the
>>>> only difference is that I use the master branch, but I don't think it
>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>> beside this one? Probably this is due to other exceptions that makes this
>>>> system unstable.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.c...@gmail.com');>> wrote:
>>>>
>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>> my code, it's throwing NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','sai.sai.s...@gmail.com');>> wrote:
>>>>>
>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>> will incur NPE at that place.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.c...@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.c...@gmail.com');>> wrote:
>>>>>>
>>>>>>> spark version - spark 1.4.1
>>>>>>>
>>>>>>> my code snippet:
>>>>>>>
>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>> String topics = "x,y,z";
>>>>>>> HashSet<String> TopicsSet = new
>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>
>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>            jssc,
>>>>>>>            String.class,
>>>>>>>            String.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            kafkaParams,
>>>>>>>             TopicsSet
>>>>>>>        );
>>>>>>>
>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>>> () {
>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>
>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>                 return null;
>>>>>>>             }
>>>>>>>        });
>>>>>>>
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.s...@gmail.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','sai.sai.s...@gmail.com');>> wrote:
>>>>>>>
>>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>> ramkumar.c...@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.c...@gmail.com');>> wrote:
>>>>>>>>
>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>>>> exception. Any idea for this ?
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>> ramkumar.c...@gmail.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.c...@gmail.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>> exception. Also exception message is very abstract.  can anyone 
>>>>>>>>>> please help
>>>>>>>>>> me ?
>>>>>>>>>>
>>>>>>>>>> Log Trace :
>>>>>>>>>>
>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>> generator
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to