Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?
On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote: > The part that works is the commented out, single receiver stream below the > loop. It seems that when I call KafkaUtils.createStream more than once, I > don’t receive any messages. > > I’ll dig through the logs, but at first glance yesterday I didn’t see > anything suspect. I’ll have to look closer. > > mn > > On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote: > >> Maybe post the before-code as in what was the code before you did the >> loop (that worked)? I had similar situations where reviewing code >> before (worked) and after (does not work) helped. Also, what helped is >> the Scala REPL because I can see what are the object types being >> returned by each statement. >> >> Other than code, in the driver logs, you should see events that say >> "Registered receiver for stream 0 from >> akka.tcp://sp...@node5.acme.net:53135" >> >> Now, if you goto "node5" and look at Spark or YarnContainer logs >> (depending on who's doing RM), you should be able to see if the >> receiver has any errors when trying to talk to kafka. >> >> >> >> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> wrote: >>> To my eyes, these are functionally equivalent. I’ll try a Scala approach, >>> but this may cause waves for me upstream (e.g., non-Java) >>> >>> Thanks for looking at this. If anyone else can see a glaring issue in the >>> Java approach that would be appreciated. >>> >>> Thanks, >>> Matt >>> >>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote: >>> >>>> Sorry, I am almost Java illiterate but here's my Scala code to do the >>>> equivalent (that I have tested to work): >>>> >>>> val kInStreams = (1 to 10).map{_ => >>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic" >>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers >>>> across the cluster, one for each partition, potentially but active >>>> receivers are only as many kafka partitions you have >>>> >>>> val kInMsg = >>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>>> >>>> >>>> >>>> >>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> >>>> wrote: >>>>> So, this is scrubbed some for confidentiality, but the meat of it is as >>>>> follows. Note, that if I substitute the commented section for the loop, >>>>> I receive messages from the topic. >>>>> >>>>> SparkConf sparkConf = new SparkConf(); >>>>> sparkConf.set("spark.streaming.unpersist", "true"); >>>>> sparkConf.set("spark.logConf", "true"); >>>>> >>>>> Map<String, String> kafkaProps = new HashMap<>(); >>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka"); >>>>> kafkaProps.put("group.id", groupId); >>>>> >>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, >>>>> Seconds.apply(1)); >>>>> jsc.checkpoint("hdfs://<some_location>"); >>>>> >>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new >>>>> ArrayList<>(5); >>>>> >>>>> for (int i = 0; i < 5; i++) { >>>>> streamList.add(KafkaUtils.createStream(jsc, >>>>> String.class, >>>>> ProtobufModel.class, >>>>> StringDecoder.class, >>>>> ProtobufModelDecoder.class, >>>>> kafkaProps, >>>>> Collections.singletonMap(topic, >>>>> 1), >>>>> StorageLevel.MEMORY_ONLY_SER())); >>>>> } >>>>> >>>>> final JavaPairDStream<String, ProtobufModel> stream = >>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size())); >>>>> >>>>> // final JavaPairReceiverInputDStream<String, ProtobufModel> stream = >>>>> // KafkaUtils.createStream(jsc, >>>>> // String.class, >>>>> ProtobufModel.class, >>>>> // StringDecoder.class, >>>>> ProtobufModelDecoder.class, >>>>> // kafkaProps, >>>>> // >>>>> Collections.singletonMap(topic, 5), >>>>> // >>>>> StorageLevel.MEMORY_ONLY_SER()); >>>>> >>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair( >>>>> new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() { >>>>> @Override >>>>> public Tuple2<String, Integer> call(Tuple2<String, >>>>> ProtobufModel> tuple) throws Exception { >>>>> return new Tuple2<>(tuple._2().getDeviceId(), 1); >>>>> } >>>>> }); >>>>> >>>>> … and futher Spark functions ... >>>>> >>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote: >>>>> >>>>>> Posting your code would be really helpful in figuring out gotchas. >>>>>> >>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> >>>>>> wrote: >>>>>>> Hey, >>>>>>> >>>>>>> Spark 1.1.0 >>>>>>> Kafka 0.8.1.1 >>>>>>> Hadoop (YARN/HDFS) 2.5.1 >>>>>>> >>>>>>> I have a five partition Kafka topic. I can create a single Kafka >>>>>>> receiver >>>>>>> via KafkaUtils.createStream with five threads in the topic map and >>>>>>> consume >>>>>>> messages fine. Sifting through the user list and Google, I see that its >>>>>>> possible to split the Kafka receiver among the Spark workers such that >>>>>>> I can >>>>>>> have a receiver per topic, and have this distributed to workers rather >>>>>>> than >>>>>>> localized to the driver. I’m following something like this: >>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132 >>>>>>> But for Kafka obviously. From the Streaming Programming Guide “ >>>>>>> Receiving >>>>>>> multiple data streams can therefore be achieved by creating multiple >>>>>>> input >>>>>>> DStreams and configuring them to receive different partitions of the >>>>>>> data >>>>>>> stream from the source(s)." >>>>>>> >>>>>>> However, I’m not able to consume any messages from Kafka after I >>>>>>> perform the >>>>>>> union operation. Again, if I create a single, multi-threaded, receiver >>>>>>> I >>>>>>> can consume messages fine. If I create 5 receivers in a loop, and call >>>>>>> jssc.union(…) i get: >>>>>>> >>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks >>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks >>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks >>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks >>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks >>>>>>> >>>>>>> >>>>>>> Do I need to do anything to the unioned DStream? Am I going about this >>>>>>> incorrectly? >>>>>>> >>>>>>> Thanks in advance. >>>>>>> >>>>>>> Matt >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org