Re: Streaming app consume multiple kafka topics
Hi Cody, Can you give a bit example how to use mapPartitions with a switch on topic? I've tried, yet still didn't work. On Tue, Mar 15, 2016 at 9:45 PM, Cody Koeningerwrote: > The direct stream gives you access to the topic. The offset range for > each partition contains the topic. That way you can create a single > stream, and the first thing you do with it is mapPartitions with a > switch on topic. > > Of course, it may make more sense to separate topics into different > jobs, but if you want it all in one, that's the most straightforward > way to do it imho. > > On Tue, Mar 15, 2016 at 1:55 AM, saurabh guru > wrote: > > I am doing the same thing this way: > > > > // Iterate over HashSet of topics > > Iterator iterator = topicsSet.iterator(); > > JavaPairInputDStream messages; > > JavaDStream lines; > > String topic = ""; > > // get messages stream for each topic > > while (iterator.hasNext()) { > > topic = iterator.next(); > > // Create direct kafka stream with brokers and topic > > messages = KafkaUtils.createDirectStream(jssc, String.class, > > String.class, StringDecoder.class, StringDecoder.class, kafkaParams, > > new HashSet(Arrays.asList(topic))); > > > > // get lines from messages.map > > lines = messages.map(new Function , > > String>() { > > @Override > > public String call(Tuple2 tuple2) { > > return tuple2._2(); > > } > > }); > > > > > > switch (topic) { > > case IMPR_ACC: > > ImprLogProc.groupAndCount(lines, esImpIndexName, > IMPR_ACC, > > new ImprMarshal()); > > > > break; > > case EVENTS_ACC: > > EventLogProc.groupAndCount(lines, esEventIndexName, > > EVENTS_ACC, new EventMarshal()); > > break; > > > > default: > > logger.error("No matching Kafka topics Found"); > > break; > > } > > > > On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das > > wrote: > >> > >> One way would be to keep it this way: > >> > >> val stream1 = KafkaUtils.createStream(..) // for topic 1 > >> > >> val stream2 = KafkaUtils.createStream(..) // for topic 2 > >> > >> > >> And you will know which stream belongs to which topic. > >> > >> Another approach which you can put in your code itself would be to tag > the > >> topic name along with the stream that you are creating. Like, create a > >> tuple(topic, stream) and you will be able to access ._1 as topic and > ._2 as > >> the stream. > >> > >> > >> Thanks > >> Best Regards > >> > >> On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi > >> wrote: > >>> > >>> Hi, > >>> > >>> I'm just trying to create a spark streaming application that consumes > >>> more than one topics sent by kafka. Then, I want to do different > further > >>> processing for data sent by each topic. > >>> > val kafkaStreams = { > val kafkaParameter = for (consumerGroup <- consumerGroups) > yield { > Map( > "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, > "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, > "group.id" -> consumerGroup, > "zookeeper.connection.timeout.ms" -> > ConsumerConfig.zookeeperConnectionTimeout, > "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, > "auto.offset.reset" -> ConsumerConfig.autoOffsetReset > ) > } > val streams = (0 to kafkaParameter.length - 1) map { p => > KafkaUtils.createStream[String, Array[Byte], StringDecoder, > DefaultDecoder]( > ssc, > kafkaParameter(p), > Map(topicsArr(p) -> 1), > StorageLevel.MEMORY_ONLY_SER > ).map(_._2) > } > val unifiedStream = ssc.union(streams) > unifiedStream.repartition(1) > } > kafkaStreams.foreachRDD(rdd => { > rdd.foreachPartition(partitionOfRecords => { > partitionOfRecords.foreach ( x => > println(x) > ) > }) > }) > >>> > >>> > >>> So far, I'm able to get the data from several topic. However, I'm still > >>> unable to > >>> differentiate the data sent from a topic with another. > >>> > >>> Do anybody has an experience in doing this stuff? > >>> > >>> Best, > >>> Imre > >> > >> > > > > > > > > -- > > Thanks, > > Saurabh > > > > :) >
Re: Streaming app consume multiple kafka topics
I am doing the same thing this way: // Iterate over HashSet of topics Iterator iterator = topicsSet.iterator(); JavaPairInputDStreammessages; JavaDStream lines; String topic = ""; // get messages stream for each topic while (iterator.hasNext()) { topic = iterator.next(); // Create direct kafka stream with brokers and topic messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, new HashSet(Arrays.asList(topic))); // get lines from messages.map lines = messages.map(new Function , String>() { @Override public String call(Tuple2 tuple2) { return tuple2._2(); } }); switch (topic) { case IMPR_ACC: ImprLogProc.groupAndCount(lines, esImpIndexName, IMPR_ACC, new ImprMarshal()); break; case EVENTS_ACC: EventLogProc.groupAndCount(lines, esEventIndexName, EVENTS_ACC, new EventMarshal()); break; default: logger.error("No matching Kafka topics Found"); break; } On Tue, Mar 15, 2016 at 12:22 PM, Akhil Das wrote: > One way would be to keep it this way: > > val stream1 = KafkaUtils.createStream(..) // for topic 1 > > val stream2 = KafkaUtils.createStream(..) // for topic 2 > > > And you will know which stream belongs to which topic. > > Another approach which you can put in your code itself would be to tag the > topic name along with the stream that you are creating. Like, create a > tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as > the stream. > > > Thanks > Best Regards > > On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi > wrote: > >> Hi, >> >> I'm just trying to create a spark streaming application that consumes >> more than one topics sent by kafka. Then, I want to do different further >> processing for data sent by each topic. >> >> val kafkaStreams = { >>> val kafkaParameter = for (consumerGroup <- consumerGroups) yield { >>> Map( >>> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, >>> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, >>> "group.id" -> consumerGroup, >>> "zookeeper.connection.timeout.ms" -> >>> ConsumerConfig.zookeeperConnectionTimeout, >>> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, >>> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset >>> ) >>> } >>> val streams = (0 to kafkaParameter.length - 1) map { p => >>> KafkaUtils.createStream[String, Array[Byte], StringDecoder, >>> DefaultDecoder]( >>> ssc, >>> kafkaParameter(p), >>> Map(topicsArr(p) -> 1), >>> StorageLevel.MEMORY_ONLY_SER >>> ).map(_._2) >>> } >>> val unifiedStream = ssc.union(streams) >>> unifiedStream.repartition(1) >>> } >>> kafkaStreams.foreachRDD(rdd => { >>> rdd.foreachPartition(partitionOfRecords => { >>> partitionOfRecords.foreach ( x => >>> println(x) >>> ) >>> }) >>> }) >> >> >> So far, I'm able to get the data from several topic. However, I'm still >> unable to >> differentiate the data sent from a topic with another. >> >> Do anybody has an experience in doing this stuff? >> >> Best, >> Imre >> > > -- Thanks, Saurabh :)
Re: Streaming app consume multiple kafka topics
Actually, I have tried your suggestion but it seems not working. Let me try it once again. Thanks for your help Best, Imre On Tue, Mar 15, 2016 at 1:52 PM, Akhil Daswrote: > One way would be to keep it this way: > > val stream1 = KafkaUtils.createStream(..) // for topic 1 > > val stream2 = KafkaUtils.createStream(..) // for topic 2 > > > And you will know which stream belongs to which topic. > > Another approach which you can put in your code itself would be to tag the > topic name along with the stream that you are creating. Like, create a > tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as > the stream. > > > Thanks > Best Regards > > On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagi > wrote: > >> Hi, >> >> I'm just trying to create a spark streaming application that consumes >> more than one topics sent by kafka. Then, I want to do different further >> processing for data sent by each topic. >> >> val kafkaStreams = { >>> val kafkaParameter = for (consumerGroup <- consumerGroups) yield { >>> Map( >>> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, >>> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, >>> "group.id" -> consumerGroup, >>> "zookeeper.connection.timeout.ms" -> >>> ConsumerConfig.zookeeperConnectionTimeout, >>> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, >>> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset >>> ) >>> } >>> val streams = (0 to kafkaParameter.length - 1) map { p => >>> KafkaUtils.createStream[String, Array[Byte], StringDecoder, >>> DefaultDecoder]( >>> ssc, >>> kafkaParameter(p), >>> Map(topicsArr(p) -> 1), >>> StorageLevel.MEMORY_ONLY_SER >>> ).map(_._2) >>> } >>> val unifiedStream = ssc.union(streams) >>> unifiedStream.repartition(1) >>> } >>> kafkaStreams.foreachRDD(rdd => { >>> rdd.foreachPartition(partitionOfRecords => { >>> partitionOfRecords.foreach ( x => >>> println(x) >>> ) >>> }) >>> }) >> >> >> So far, I'm able to get the data from several topic. However, I'm still >> unable to >> differentiate the data sent from a topic with another. >> >> Do anybody has an experience in doing this stuff? >> >> Best, >> Imre >> > >
Re: Streaming app consume multiple kafka topics
One way would be to keep it this way: val stream1 = KafkaUtils.createStream(..) // for topic 1 val stream2 = KafkaUtils.createStream(..) // for topic 2 And you will know which stream belongs to which topic. Another approach which you can put in your code itself would be to tag the topic name along with the stream that you are creating. Like, create a tuple(topic, stream) and you will be able to access ._1 as topic and ._2 as the stream. Thanks Best Regards On Tue, Mar 15, 2016 at 12:05 PM, Imre Nagiwrote: > Hi, > > I'm just trying to create a spark streaming application that consumes more > than one topics sent by kafka. Then, I want to do different further > processing for data sent by each topic. > > val kafkaStreams = { >> val kafkaParameter = for (consumerGroup <- consumerGroups) yield { >> Map( >> "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, >> "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, >> "group.id" -> consumerGroup, >> "zookeeper.connection.timeout.ms" -> >> ConsumerConfig.zookeeperConnectionTimeout, >> "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, >> "auto.offset.reset" -> ConsumerConfig.autoOffsetReset >> ) >> } >> val streams = (0 to kafkaParameter.length - 1) map { p => >> KafkaUtils.createStream[String, Array[Byte], StringDecoder, >> DefaultDecoder]( >> ssc, >> kafkaParameter(p), >> Map(topicsArr(p) -> 1), >> StorageLevel.MEMORY_ONLY_SER >> ).map(_._2) >> } >> val unifiedStream = ssc.union(streams) >> unifiedStream.repartition(1) >> } >> kafkaStreams.foreachRDD(rdd => { >> rdd.foreachPartition(partitionOfRecords => { >> partitionOfRecords.foreach ( x => >> println(x) >> ) >> }) >> }) > > > So far, I'm able to get the data from several topic. However, I'm still > unable to > differentiate the data sent from a topic with another. > > Do anybody has an experience in doing this stuff? > > Best, > Imre >
Streaming app consume multiple kafka topics
Hi, I'm just trying to create a spark streaming application that consumes more than one topics sent by kafka. Then, I want to do different further processing for data sent by each topic. val kafkaStreams = { > val kafkaParameter = for (consumerGroup <- consumerGroups) yield { > Map( > "metadata.broker.list" -> ConsumerConfig.metadataBrokerList, > "zookeeper.connect" -> ConsumerConfig.zookeeperConnect, > "group.id" -> consumerGroup, > "zookeeper.connection.timeout.ms" -> > ConsumerConfig.zookeeperConnectionTimeout, > "schema.registry.url" -> ConsumerConfig.schemaRegistryUrl, > "auto.offset.reset" -> ConsumerConfig.autoOffsetReset > ) > } > val streams = (0 to kafkaParameter.length - 1) map { p => > KafkaUtils.createStream[String, Array[Byte], StringDecoder, > DefaultDecoder]( > ssc, > kafkaParameter(p), > Map(topicsArr(p) -> 1), > StorageLevel.MEMORY_ONLY_SER > ).map(_._2) > } > val unifiedStream = ssc.union(streams) > unifiedStream.repartition(1) > } > kafkaStreams.foreachRDD(rdd => { > rdd.foreachPartition(partitionOfRecords => { > partitionOfRecords.foreach ( x => > println(x) > ) > }) > }) So far, I'm able to get the data from several topic. However, I'm still unable to differentiate the data sent from a topic with another. Do anybody has an experience in doing this stuff? Best, Imre