I am doing the same thing this way:
// Iterate over HashSet of topics
Iterator<String> iterator = topicsSet.iterator();
JavaPairInputDStream<String, String> messages;
JavaDStream<String> lines;
String topic = "";
// get messages stream for each topic
while (iterator.hasNext()) {
topic = iterator.next();
// Create direct kafka stream with brokers and topic
messages = KafkaUtils.createDirectStream(jssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
new HashSet<String>(Arrays.asList(topic)));
// get lines from messages.map
lines = messages.map(new Function<Tuple2<String, String>,
String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
switch (topic) {
case IMPR_ACC:
ImprLogProc.groupAndCount(lines, esImpIndexName, IMPR_ACC,
new ImprMarshal());
break;
case EVENTS_ACC:
EventLogProc.groupAndCount(lines, esEventIndexName,
EVENTS_ACC, new EventMarshal());
break;
default:
logger.error("No matching Kafka topics Found");
break;
}
On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das <[email protected]>
wrote:
> One way would be to keep it this way:
>
> val stream1 = KafkaUtils.createStream(..) // for topic 1
>
> val stream2 = KafkaUtils.createStream(..) // for topic 2
>
>
> And you will know which stream belongs to which topic.
>
> Another approach which you can put in your code itself would be to tag the
> topic name along with the stream that you are creating. Like, create a
> tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as
> the stream.
>
>
> Thanks
> Best Regards
>
> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi <[email protected]>
> wrote:
>
>> Hi,
>>
>> I'm just trying to create a spark streaming application that consumes
>> more than one topics sent by kafka. Then, I want to do different further
>> processing for data sent by each topic.
>>
>> val kafkaStreams = {
>>> val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
>>> Map(
>>> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
>>> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
>>> "group.id" -> consumerGroup,
>>> "zookeeper.connection.timeout.ms" ->
>>> ConsumerConfig.zookeeperConnectionTimeout,
>>> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
>>> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
>>> )
>>> }
>>> val streams = (0 to kafkaParameter.length - 1) map { p =>
>>> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
>>> DefaultDecoder](
>>> ssc,
>>> kafkaParameter(p),
>>> Map(topicsArr(p) -> 1),
>>> StorageLevel.MEMORY_ONLY_SER
>>> ).map(_._2)
>>> }
>>> val unifiedStream = ssc.union(streams)
>>> unifiedStream.repartition(1)
>>> }
>>> kafkaStreams.foreachRDD(rdd => {
>>> rdd.foreachPartition(partitionOfRecords => {
>>> partitionOfRecords.foreach ( x =>
>>> println(x)
>>> )
>>> })
>>> })
>>
>>
>> So far, I'm able to get the data from several topic. However, I'm still
>> unable to
>> differentiate the data sent from a topic with another.
>>
>> Do anybody has an experience in doing this stuff?
>>
>> Best,
>> Imre
>>
>
>
--
Thanks,
Saurabh
:)