Just put them all in one stream and switch processing based on the topic On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ramkumar.c...@gmail.com> wrote:
> i want to join all those logs in some manner. That's what i'm trying to do. > > *Thanks*, > <https://in.linkedin.com/in/ramkumarcs31> > > > On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> I don't think Spark Streaming supports multiple streaming context in one >> jvm, you cannot use in such way. Instead you could run multiple streaming >> applications, since you're using Yarn. >> >> 2015年10月30日星期五,Ramkumar V <ramkumar.c...@gmail.com> 写道: >> >>> I found NPE is mainly because of im using the same JavaStreamingContext >>> for some other kafka stream. if i change the name , its running >>> successfully. how to run multiple JavaStreamingContext in a program ? I'm >>> getting following exception if i run multiple JavaStreamingContext in >>> single file. >>> >>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED, >>> exitCode: 15, (reason: User class threw exception: >>> java.lang.IllegalStateException: Only one StreamingContext may be started >>> in this JVM. Currently running StreamingContext was started >>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622) >>> >>> >>> *Thanks*, >>> <https://in.linkedin.com/in/ramkumarcs31> >>> >>> >>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> From the code, I think this field "rememberDuration" shouldn't be >>>> null, it will be verified at the start, unless some place changes it's >>>> value in the runtime that makes it null, but I cannot image how this >>>> happened. Maybe you could add some logs around the place where exception >>>> happens if you could reproduce it. >>>> >>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.c...@gmail.com> >>>> wrote: >>>> >>>>> No. this is the only exception that im getting multiple times in my >>>>> log. Also i was reading some other topics earlier but im not faced this >>>>> NPE. >>>>> >>>>> *Thanks*, >>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>> >>>>> >>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> I just did a local test with your code, seems everything is fine, the >>>>>> only difference is that I use the master branch, but I don't think it >>>>>> changes a lot in this part. Do you met any other exceptions or errors >>>>>> beside this one? Probably this is due to other exceptions that makes this >>>>>> system unstable. >>>>>> >>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.c...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> No, i dont have any special settings. if i keep only reading line in >>>>>>> my code, it's throwing NPE. >>>>>>> >>>>>>> *Thanks*, >>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>> >>>>>>> >>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.s...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Do you have any special settings, from your code, I don't think it >>>>>>>> will incur NPE at that place. >>>>>>>> >>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V < >>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>> >>>>>>>>> spark version - spark 1.4.1 >>>>>>>>> >>>>>>>>> my code snippet: >>>>>>>>> >>>>>>>>> String brokers = "ip:port,ip:port"; >>>>>>>>> String topics = "x,y,z"; >>>>>>>>> HashSet<String> TopicsSet = new >>>>>>>>> HashSet<String>(Arrays.asList(topics.split(","))); >>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, >>>>>>>>> String>(); >>>>>>>>> kafkaParams.put("metadata.broker.list", brokers); >>>>>>>>> >>>>>>>>> JavaPairInputDStream<String, String> messages = >>>>>>>>> KafkaUtils.createDirectStream( >>>>>>>>> jssc, >>>>>>>>> String.class, >>>>>>>>> String.class, >>>>>>>>> StringDecoder.class, >>>>>>>>> StringDecoder.class, >>>>>>>>> kafkaParams, >>>>>>>>> TopicsSet >>>>>>>>> ); >>>>>>>>> >>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , >>>>>>>>> String>,Void> () { >>>>>>>>> public Void call(JavaPairRDD<String , String> tuple) { >>>>>>>>> JavaRDD<String>rdd = tuple.values(); >>>>>>>>> >>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output"); >>>>>>>>> return null; >>>>>>>>> } >>>>>>>>> }); >>>>>>>>> >>>>>>>>> >>>>>>>>> *Thanks*, >>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao < >>>>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> What Spark version are you using, also a small code snippet of >>>>>>>>>> how you use Spark Streaming would be greatly helpful. >>>>>>>>>> >>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V < >>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting >>>>>>>>>>> this exception. Any idea for this ? >>>>>>>>>>> >>>>>>>>>>> *Thanks*, >>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V < >>>>>>>>>>> ramkumar.c...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile. >>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the >>>>>>>>>>>> following >>>>>>>>>>>> exception. Also exception message is very abstract. can anyone >>>>>>>>>>>> please help >>>>>>>>>>>> me ? >>>>>>>>>>>> >>>>>>>>>>>> Log Trace : >>>>>>>>>>>> >>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job >>>>>>>>>>>> generator >>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class >>>>>>>>>>>> threw exception: java.lang.NullPointerException >>>>>>>>>>>> java.lang.NullPointerException >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) >>>>>>>>>>>> at >>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org >>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82) >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *Thanks*, >>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >