Hi,
I'm just trying to create a spark streaming application that consumes more
than one topics sent by kafka. Then, I want to do different further
processing for data sent by each topic.
val kafkaStreams = {
> val kafkaParameter = for (consumerGroup <- consumerGroups) yield {
> Map(
> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList,
> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect,
> "group.id" -> consumerGroup,
> "zookeeper.connection.timeout.ms" ->
> ConsumerConfig.zookeeperConnectionTimeout,
> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl,
> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset
> )
> }
> val streams = (0 to kafkaParameter.length - 1) map { p =>
> KafkaUtils.createStream[String, Array[Byte], StringDecoder,
> DefaultDecoder](
> ssc,
> kafkaParameter(p),
> Map(topicsArr(p) -> 1),
> StorageLevel.MEMORY_ONLY_SER
> ).map(_._2)
> }
> val unifiedStream = ssc.union(streams)
> unifiedStream.repartition(1)
> }
> kafkaStreams.foreachRDD(rdd => {
> rdd.foreachPartition(partitionOfRecords => {
> partitionOfRecords.foreach ( x =>
> println(x)
> )
> })
> })
So far, I'm able to get the data from several topic. However, I'm still
unable to
differentiate the data sent from a topic with another.
Do anybody has an experience in doing this stuff?
Best,
Imre