So, this is scrubbed some for confidentiality, but the meat of it is as
follows. Note, that if I substitute the commented section for the loop, I
receive messages from the topic.
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.unpersist", "true");
sparkConf.set("spark.logConf", "true");
Map<String, String> kafkaProps = new HashMap<>();
kafkaProps.put("zookeeper.connect", Constants.ZK_ENSEMBLE + "/kafka");
kafkaProps.put("group.id", groupId);
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf,
Seconds.apply(1));
jsc.checkpoint("hdfs://<some_location>");
List<JavaPairDStream<String, ProtobufModel>> streamList = new ArrayList<>(5);
for (int i = 0; i < 5; i++) {
streamList.add(KafkaUtils.createStream(jsc,
String.class, ProtobufModel.class,
StringDecoder.class,
ProtobufModelDecoder.class,
kafkaProps,
Collections.singletonMap(topic, 1),
StorageLevel.MEMORY_ONLY_SER()));
}
final JavaPairDStream<String, ProtobufModel> stream =
jsc.union(streamList.get(0), streamList.subList(1, streamList.size()));
// final JavaPairReceiverInputDStream<String, ProtobufModel> stream =
// KafkaUtils.createStream(jsc,
// String.class, ProtobufModel.class,
// StringDecoder.class,
ProtobufModelDecoder.class,
// kafkaProps,
// Collections.singletonMap(topic, 5),
// StorageLevel.MEMORY_ONLY_SER());
final JavaPairDStream<String, Integer> tuples = stream.mapToPair(
new PairFunction<Tuple2<String, ProtobufModel>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<String, ProtobufModel>
tuple) throws Exception {
return new Tuple2<>(tuple._2().getDeviceId(), 1);
}
});
… and futher Spark functions ...
On Sep 23, 2014, at 2:55 PM, Tim Smith <[email protected]> wrote:
> Posting your code would be really helpful in figuring out gotchas.
>
> On Tue, Sep 23, 2014 at 9:19 AM, Matt Narrell <[email protected]> wrote:
>> Hey,
>>
>> Spark 1.1.0
>> Kafka 0.8.1.1
>> Hadoop (YARN/HDFS) 2.5.1
>>
>> I have a five partition Kafka topic. I can create a single Kafka receiver
>> via KafkaUtils.createStream with five threads in the topic map and consume
>> messages fine. Sifting through the user list and Google, I see that its
>> possible to split the Kafka receiver among the Spark workers such that I can
>> have a receiver per topic, and have this distributed to workers rather than
>> localized to the driver. I’m following something like this:
>> https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L132
>> But for Kafka obviously. From the Streaming Programming Guide “ Receiving
>> multiple data streams can therefore be achieved by creating multiple input
>> DStreams and configuring them to receive different partitions of the data
>> stream from the source(s)."
>>
>> However, I’m not able to consume any messages from Kafka after I perform the
>> union operation. Again, if I create a single, multi-threaded, receiver I
>> can consume messages fine. If I create 5 receivers in a loop, and call
>> jssc.union(…) i get:
>>
>> INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 2 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 3 received 0 blocks
>> INFO scheduler.ReceiverTracker: Stream 4 received 0 blocks
>>
>>
>> Do I need to do anything to the unioned DStream? Am I going about this
>> incorrectly?
>>
>> Thanks in advance.
>>
>> Matt
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]