Yeah, that's the general idea. When you say hard code topic name, do you mean Set(topicA, topicB, topicB) ? You should be able to use a variable for that - read it from a config file, whatever.
If you're talking about the match statement, yeah you'd need to hardcode your cases. On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <[email protected]> wrote: > Ok. I got it! > But it seems that I need to hard code topic name. > > something like that? > > val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], > DefaultDecoder, DefaultDecoder]( > ssc, kafkaParams, Set(topicA, topicB, topicB)) > .transform{ rdd => > val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > rdd.mapPartitionsWithIndex( > (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) => > offsetRange(idx).topic match { > case "topicA" => ... > case "topicB" => ... > case _ => .... > } > ) > } > > > > > 08.09.2015, 19:21, "Cody Koeninger" <[email protected]>: > > That doesn't really matter. With the direct stream you'll get all objects > for a given topicpartition in the same spark partition. You know what > topic it's from via hasOffsetRanges. Then you can deserialize > appropriately based on topic. > > On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <[email protected]> > wrote: > > The thing is, that these topics contain absolutely different AVRO > objects(Array[Byte]) that I need to deserialize to different Java(Scala) > objects, filter and then map to tuple (String, String). So i have 3 streams > with different avro object in there. I need to cast them(using some > business rules) to pairs and unite. > > -- > Яндекс.Почта — надёжная почта > http://mail.yandex.ru/neo2/collect/?exp=1&t=1 > > > 08.09.2015, 19:11, "Cody Koeninger" <[email protected]>: > > I'm not 100% sure what's going on there, but why are you doing a union > in the first place? > > > > If you want multiple topics in a stream, just pass them all in the set > of topics to one call to createDirectStream > > > > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <[email protected]> > wrote: > >> Ok. > >> Spark 1.4.1 on yarn > >> > >> Here is my application > >> I have 4 different Kafka topics(different object streams) > >> > >> type Edge = (String,String) > >> > >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( > nonEmpty ).map( toEdge ) > >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( > nonEmpty ).map( toEdge ) > >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( > nonEmpty ).map( toEdge ) > >> > >> val u = a union b union c > >> > >> val source = u.window(Seconds(600), Seconds(10)) > >> > >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( > nonEmpty ).map( toEdge ) > >> > >> val joinResult = source.rightOuterJoin( z ) > >> joinResult.foreachRDD { rdd=> > >> rdd.foreachPartition { partition => > >> .... // save to result topic in kafka > >> } > >> } > >> > >> The 'window' function in the code above is constantly growing, > >> no matter how many events appeared in corresponding kafka topics > >> > >> but if I change one line from > >> > >> val source = u.window(Seconds(600), Seconds(10)) > >> > >> to > >> > >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8)) > >> > >> val source = u.transform(_.partitionBy(partitioner.value) > ).window(Seconds(600), Seconds(10)) > >> > >> Everything works perfect. > >> > >> Perhaps the problem was in WindowedDStream > >> > >> I forced to use PartitionerAwareUnionRDD( partitionBy the same > partitioner ) instead of UnionRDD. > >> > >> Nonetheless I did not see any hints about such a bahaviour in doc. > >> Is it a bug or absolutely normal behaviour? > >> > >> 08.09.2015, 17:03, "Cody Koeninger" <[email protected]>: > >> > >>> Can you provide more info (what version of spark, code example)? > >>> > >>> On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <[email protected]> > wrote: > >>>> Hi, > >>>> > >>>> I have an application with 2 streams, which are joined together. > >>>> Stream1 - is simple DStream(relativly small size batch chunks) > >>>> Stream2 - is a windowed DStream(with duration for example 60 seconds) > >>>> > >>>> Stream1 and Stream2 are Kafka direct stream. > >>>> The problem is that according to logs window operation is constantly > increasing(<a href=" > http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php > ">screen</a>). > >>>> And also I see gap in pocessing window(<a href=" > http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>) > in logs there are no events in that period. > >>>> So what is happen in that gap and why window is constantly > insreasing? > >>>> > >>>> Thank you in advance > >>>> > >>>> --------------------------------------------------------------------- > >>>> To unsubscribe, e-mail: [email protected] > >>>> For additional commands, e-mail: [email protected] > >
