That`s correct, I have 10 seconds batch. The problem is actually in processing time, it is increasing constantly no matter how small or large my window duration is. I am trying to prepare some example code to clarify my use case.
-- Яндекс.Почта — надёжная почта http://mail.yandex.ru/neo2/collect/?exp=1&t=1 09.09.2015, 17:04, "Cody Koeninger" <c...@koeninger.org>: > It looked like from your graphs that you had a 10 second batch time, but that > your processing time was consistently 11 seconds. If that's correct, then > yes your delay is going to keep growing. You'd need to either increase your > batch time, or get your processing time down (either by adding more resources > or changing your code). > > I'd expect adding a repartition / shuffle to increase processing time, not > decrease it. What are you seeing after adding the partitionBy call? > > On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote: >> Oh my, I implemented one directStream instead of union of three but it is >> still growing exponential with window method. >> >> -- >> Яндекс.Почта — надёжная почта >> http://mail.yandex.ru/neo2/collect/?exp=1&t=1 >> >> 08.09.2015, 23:53, "Cody Koeninger" <c...@koeninger.org>: >> >>> Yeah, that's the general idea. >>> >>> When you say hard code topic name, do you mean Set(topicA, topicB, topicB) >>> ? You should be able to use a variable for that - read it from a config >>> file, whatever. >>> >>> If you're talking about the match statement, yeah you'd need to hardcode >>> your cases. >>> >>> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote: >>>> Ok. I got it! >>>> But it seems that I need to hard code topic name. >>>> >>>> something like that? >>>> >>>> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], >>>> DefaultDecoder, DefaultDecoder]( >>>> ssc, kafkaParams, Set(topicA, topicB, topicB)) >>>> .transform{ rdd => >>>> val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>> rdd.mapPartitionsWithIndex( >>>> (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) => >>>> offsetRange(idx).topic match { >>>> case "topicA" => ... >>>> case "topicB" => ... >>>> case _ => .... >>>> } >>>> ) >>>> } >>>> >>>> 08.09.2015, 19:21, "Cody Koeninger" <c...@koeninger.org>: >>>>> That doesn't really matter. With the direct stream you'll get all >>>>> objects for a given topicpartition in the same spark partition. You know >>>>> what topic it's from via hasOffsetRanges. Then you can deserialize >>>>> appropriately based on topic. >>>>> >>>>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.pon...@ya.ru> >>>>> wrote: >>>>>> The thing is, that these topics contain absolutely different AVRO >>>>>> objects(Array[Byte]) that I need to deserialize to different Java(Scala) >>>>>> objects, filter and then map to tuple (String, String). So i have 3 >>>>>> streams with different avro object in there. I need to cast them(using >>>>>> some business rules) to pairs and unite. >>>>>> >>>>>> -- >>>>>> Яндекс.Почта — надёжная почта >>>>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1 >>>>>> >>>>>> 08.09.2015, 19:11, "Cody Koeninger" <c...@koeninger.org>: >>>>>> >>>>>>> I'm not 100% sure what's going on there, but why are you doing a union >>>>>>> in the first place? >>>>>>> >>>>>>> If you want multiple topics in a stream, just pass them all in the set >>>>>>> of topics to one call to createDirectStream >>>>>>> >>>>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.pon...@ya.ru> >>>>>>> wrote: >>>>>>>> Ok. >>>>>>>> Spark 1.4.1 on yarn >>>>>>>> >>>>>>>> Here is my application >>>>>>>> I have 4 different Kafka topics(different object streams) >>>>>>>> >>>>>>>> type Edge = (String,String) >>>>>>>> >>>>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( >>>>>>>> nonEmpty ).map( toEdge ) >>>>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( >>>>>>>> nonEmpty ).map( toEdge ) >>>>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( >>>>>>>> nonEmpty ).map( toEdge ) >>>>>>>> >>>>>>>> val u = a union b union c >>>>>>>> >>>>>>>> val source = u.window(Seconds(600), Seconds(10)) >>>>>>>> >>>>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( >>>>>>>> nonEmpty ).map( toEdge ) >>>>>>>> >>>>>>>> val joinResult = source.rightOuterJoin( z ) >>>>>>>> joinResult.foreachRDD { rdd=> >>>>>>>> rdd.foreachPartition { partition => >>>>>>>> .... // save to result topic in kafka >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> The 'window' function in the code above is constantly growing, >>>>>>>> no matter how many events appeared in corresponding kafka topics >>>>>>>> >>>>>>>> but if I change one line from >>>>>>>> >>>>>>>> val source = u.window(Seconds(600), Seconds(10)) >>>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8)) >>>>>>>> >>>>>>>> val source = u.transform(_.partitionBy(partitioner.value) >>>>>>>> ).window(Seconds(600), Seconds(10)) >>>>>>>> >>>>>>>> Everything works perfect. >>>>>>>> >>>>>>>> Perhaps the problem was in WindowedDStream >>>>>>>> >>>>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the same >>>>>>>> partitioner ) instead of UnionRDD. >>>>>>>> >>>>>>>> Nonetheless I did not see any hints about such a bahaviour in doc. >>>>>>>> Is it a bug or absolutely normal behaviour? >>>>>>>> >>>>>>>> 08.09.2015, 17:03, "Cody Koeninger" <c...@koeninger.org>: >>>>>>>> >>>>>>>>> Can you provide more info (what version of spark, code example)? >>>>>>>>> >>>>>>>>> On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.pon...@ya.ru> >>>>>>>>> wrote: >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I have an application with 2 streams, which are joined together. >>>>>>>>>> Stream1 - is simple DStream(relativly small size batch chunks) >>>>>>>>>> Stream2 - is a windowed DStream(with duration for example 60 >>>>>>>>>> seconds) >>>>>>>>>> >>>>>>>>>> Stream1 and Stream2 are Kafka direct stream. >>>>>>>>>> The problem is that according to logs window operation is >>>>>>>>>> constantly increasing(<a >>>>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php">screen</a>). >>>>>>>>>> And also I see gap in pocessing window(<a >>>>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>) >>>>>>>>>> in logs there are no events in that period. >>>>>>>>>> So what is happen in that gap and why window is constantly >>>>>>>>>> insreasing? >>>>>>>>>> >>>>>>>>>> Thank you in advance >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org