Maybe differences between JavaPairDStream and JavaPairReceiverInputDStream?

On Wed, Sep 24, 2014 at 7:46 AM, Matt Narrell <matt.narr...@gmail.com> wrote:
> The part that works is the commented out, single receiver stream below the 
> loop.  It seems that when I call KafkaUtils.createStream more than once, I 
> don’t receive any messages.
>
> I’ll dig through the logs, but at first glance yesterday I didn’t see 
> anything suspect.  I’ll have to look closer.
>
> mn
>
> On Sep 23, 2014, at 6:14 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> Maybe post the before-code as in what was the code before you did the
>> loop (that worked)? I had similar situations where reviewing code
>> before (worked) and after (does not work) helped. Also, what helped is
>> the Scala REPL because I can see what are the object types being
>> returned by each statement.
>>
>> Other than code, in the driver logs, you should see events that say
>> "Registered receiver for stream 0 from
>> akka.tcp://sp...@node5.acme.net:53135"
>>
>> Now, if you goto "node5" and look at Spark or YarnContainer logs
>> (depending on who's doing RM), you should be able to see if the
>> receiver has any errors when trying to talk to kafka.
>>
>>
>>
>> On Tue, Sep 23, 2014 at 3:21 PM, Matt Narrell <matt.narr...@gmail.com> wrote:
>>> To my eyes, these are functionally equivalent.  I’ll try a Scala approach, 
>>> but this may cause waves for me upstream (e.g., non-Java)
>>>
>>> Thanks for looking at this.  If anyone else can see a glaring issue in the 
>>> Java approach that would be appreciated.
>>>
>>> Thanks,
>>> Matt
>>>
>>> On Sep 23, 2014, at 4:13 PM, Tim Smith <secs...@gmail.com> wrote:
>>>
>>>> Sorry, I am almost Java illiterate but here's my Scala code to do the
>>>> equivalent (that I have tested to work):
>>>>
>>>> val kInStreams = (1 to 10).map{_ =>
>>>> KafkaUtils.createStream(ssc,zkhost.acme.net:2182,"myGrp",Map("myTopic"
>>>> -> 1), StorageLevel.MEMORY_AND_DISK_SER) } //Create 10 receivers
>>>> across the cluster, one for each partition, potentially but active
>>>> receivers are only as many kafka partitions you have
>>>>
>>>> val kInMsg = 
>>>> ssc.union(kInStreams).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 23, 2014 at 2:20 PM, Matt Narrell <matt.narr...@gmail.com> 
>>>> wrote:
>>>>> So, this is scrubbed some for confidentiality, but the meat of it is as 
>>>>> follows.  Note, that if I substitute the commented section for the loop, 
>>>>> I receive messages from the topic.
>>>>>
>>>>> SparkConf sparkConf = new SparkConf();
>>>>> sparkConf.set("spark.streaming.unpersist", "true");
>>>>> sparkConf.set("spark.logConf", "true");
>>>>>
>>>>> Map<String, String> kafkaProps = new HashMap<>();
>>>>> kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
>>>>> kafkaProps.put("group.id", groupId);
>>>>>
>>>>> JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, 
>>>>> Seconds.apply(1));
>>>>> jsc.checkpoint("hdfs://<some_location>");
>>>>>
>>>>> List<JavaPairDStream<String, ProtobufModel>> streamList = new 
>>>>> ArrayList<>(5);
>>>>>
>>>>> for (int i = 0; i < 5; i++) {
>>>>>   streamList.add(KafkaUtils.createStream(jsc,
>>>>>                                          String.class, 
>>>>> ProtobufModel.class,
>>>>>                                          StringDecoder.class, 
>>>>> ProtobufModelDecoder.class,
>>>>>                                          kafkaProps,
>>>>>                                          Collections.singletonMap(topic, 
>>>>> 1),
>>>>>                                          StorageLevel.MEMORY_ONLY_SER()));
>>>>> }
>>>>>
>>>>> final JavaPairDStream<String, ProtobufModel> stream = 
>>>>> jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
>>>>>
>>>>> //  final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
>>>>> //                  KafkaUtils.createStream(jsc,
>>>>> //                                          String.class, 
>>>>> ProtobufModel.class,
>>>>> //                                          StringDecoder.class, 
>>>>> ProtobufModelDecoder.class,
>>>>> //                                          kafkaProps,
>>>>> //                                          
>>>>> Collections.singletonMap(topic, 5),
>>>>> //                                          
>>>>> StorageLevel.MEMORY_ONLY_SER());
>>>>>
>>>>> final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
>>>>>       new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
>>>>>           @Override
>>>>>           public Tuple2<String, Integer> call(Tuple2<String, 
>>>>> ProtobufModel> tuple) throws Exception {
>>>>>               return new Tuple2<>(tuple._2().getDeviceId(), 1);
>>>>>           }
>>>>>       });
>>>>>
>>>>> … and futher Spark functions ...
>>>>>
>>>>> On Sep 23, 2014, at 2:55 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>>> Posting your code would be really helpful in figuring out gotchas.
>>>>>>
>>>>>> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <matt.narr...@gmail.com> 
>>>>>> wrote:
>>>>>>> Hey,
>>>>>>>
>>>>>>> Spark 1.1.0
>>>>>>> Kafka 0.8.1.1
>>>>>>> Hadoop (YARN/HDFS) 2.5.1
>>>>>>>
>>>>>>> I have a five partition Kafka topic.  I can create a single Kafka 
>>>>>>> receiver
>>>>>>> via KafkaUtils.createStream with five threads in the topic map and 
>>>>>>> consume
>>>>>>> messages fine.  Sifting through the user list and Google, I see that its
>>>>>>> possible to split the Kafka receiver among the Spark workers such that 
>>>>>>> I can
>>>>>>> have a receiver per topic, and have this distributed to workers rather 
>>>>>>> than
>>>>>>> localized to the driver.  I’m following something like this:
>>>>>>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>>>>>>> But for Kafka obviously.  From the Streaming Programming Guide “ 
>>>>>>> Receiving
>>>>>>> multiple data streams can therefore be achieved by creating multiple 
>>>>>>> input
>>>>>>> DStreams and configuring them to receive different partitions of the 
>>>>>>> data
>>>>>>> stream from the source(s)."
>>>>>>>
>>>>>>> However, I’m not able to consume any messages from Kafka after I 
>>>>>>> perform the
>>>>>>> union operation.  Again, if I create a single, multi-threaded, receiver 
>>>>>>> I
>>>>>>> can consume messages fine.  If I create 5 receivers in a loop, and call
>>>>>>> jssc.union(…) i get:
>>>>>>>
>>>>>>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>>>>>>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>>>>>>
>>>>>>>
>>>>>>> Do I need to do anything to the unioned DStream?  Am I going about this
>>>>>>> incorrectly?
>>>>>>>
>>>>>>> Thanks in advance.
>>>>>>>
>>>>>>> Matt
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to