The thing is, that these topics contain absolutely different AVRO objects(Array[Byte]) that I need to deserialize to different Java(Scala) objects, filter and then map to tuple (String, String). So i have 3 streams with different avro object in there. I need to cast them(using some business rules) to pairs and unite.
-- Яндекс.Почта — надёжная почта http://mail.yandex.ru/neo2/collect/?exp=1&t=1 08.09.2015, 19:11, "Cody Koeninger" <[email protected]>: > I'm not 100% sure what's going on there, but why are you doing a union in the > first place? > > If you want multiple topics in a stream, just pass them all in the set of > topics to one call to createDirectStream > > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <[email protected]> wrote: >> Ok. >> Spark 1.4.1 on yarn >> >> Here is my application >> I have 4 different Kafka topics(different object streams) >> >> type Edge = (String,String) >> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty >> ).map( toEdge ) >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty >> ).map( toEdge ) >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty >> ).map( toEdge ) >> >> val u = a union b union c >> >> val source = u.window(Seconds(600), Seconds(10)) >> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty >> ).map( toEdge ) >> >> val joinResult = source.rightOuterJoin( z ) >> joinResult.foreachRDD { rdd=> >> rdd.foreachPartition { partition => >> .... // save to result topic in kafka >> } >> } >> >> The 'window' function in the code above is constantly growing, >> no matter how many events appeared in corresponding kafka topics >> >> but if I change one line from >> >> val source = u.window(Seconds(600), Seconds(10)) >> >> to >> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8)) >> >> val source = u.transform(_.partitionBy(partitioner.value) >> ).window(Seconds(600), Seconds(10)) >> >> Everything works perfect. >> >> Perhaps the problem was in WindowedDStream >> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner ) >> instead of UnionRDD. >> >> Nonetheless I did not see any hints about such a bahaviour in doc. >> Is it a bug or absolutely normal behaviour? >> >> 08.09.2015, 17:03, "Cody Koeninger" <[email protected]>: >> >>> Can you provide more info (what version of spark, code example)? >>> >>> On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <[email protected]> wrote: >>>> Hi, >>>> >>>> I have an application with 2 streams, which are joined together. >>>> Stream1 - is simple DStream(relativly small size batch chunks) >>>> Stream2 - is a windowed DStream(with duration for example 60 seconds) >>>> >>>> Stream1 and Stream2 are Kafka direct stream. >>>> The problem is that according to logs window operation is constantly >>>> increasing(<a >>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php">screen</a>). >>>> And also I see gap in pocessing window(<a >>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>) >>>> in logs there are no events in that period. >>>> So what is happen in that gap and why window is constantly insreasing? >>>> >>>> Thank you in advance >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: [email protected] >>>> For additional commands, e-mail: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
