Just put them all in one stream and switch processing based on the topic

On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com> wrote:

> i want to join all those logs in some manner. That's what i'm trying to do.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> I don't think Spark Streaming supports multiple streaming context in one
>> jvm, you cannot use in such way. Instead you could run multiple streaming
>> applications, since you're using Yarn.
>>
>> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道:
>>
>>> I found NPE is mainly because of im using the same JavaStreamingContext
>>> for some other kafka stream. if i change the name , its running
>>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>>> getting following exception if i run multiple JavaStreamingContext in
>>> single file.
>>>
>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>>> exitCode: 15, (reason: User class threw exception:
>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>> in this JVM. Currently running StreamingContext was started
>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>> wrote:
>>>
>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>> null, it will be verified at the start, unless some place changes it's
>>>> value in the runtime that makes it null, but I cannot image how this
>>>> happened. Maybe you could add some logs around the place where exception
>>>> happens if you could reproduce it.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>> wrote:
>>>>
>>>>> No. this is the only exception that im getting multiple times in my
>>>>> log. Also i was reading some other topics earlier but im not faced this 
>>>>> NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I just did a local test with your code, seems everything is fine, the
>>>>>> only difference is that I use the master branch, but I don't think it
>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>>> system unstable.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>>>> my code, it's throwing NPE.
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>>> will incur NPE at that place.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>
>>>>>>>>> my code snippet:
>>>>>>>>>
>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>> String topics = "x,y,z";
>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>> String>();
>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>
>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>            jssc,
>>>>>>>>>            String.class,
>>>>>>>>>            String.class,
>>>>>>>>>            StringDecoder.class,
>>>>>>>>>            StringDecoder.class,
>>>>>>>>>            kafkaParams,
>>>>>>>>>             TopicsSet
>>>>>>>>>        );
>>>>>>>>>
>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>> String>,Void> () {
>>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>
>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>                 return null;
>>>>>>>>>             }
>>>>>>>>>        });
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>> sai.sai.s...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>
>>>>>>>>>>> *Thanks*,
>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>> ramkumar.c...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the 
>>>>>>>>>>>> following
>>>>>>>>>>>> exception. Also exception message is very abstract.  can anyone 
>>>>>>>>>>>> please help
>>>>>>>>>>>> me ?
>>>>>>>>>>>>
>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>
>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>> generator
>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to