Thank you very much for great answer! -- Яндекс.Почта — надёжная почта http://mail.yandex.ru/neo2/collect/?exp=1&t=1
08.09.2015, 23:53, "Cody Koeninger" <c...@koeninger.org>: > Yeah, that's the general idea. > > When you say hard code topic name, do you mean Set(topicA, topicB, topicB) ? > You should be able to use a variable for that - read it from a config file, > whatever. > > If you're talking about the match statement, yeah you'd need to hardcode your > cases. > > On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей <alexey.pon...@ya.ru> wrote: >> Ok. I got it! >> But it seems that I need to hard code topic name. >> >> something like that? >> >> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], >> DefaultDecoder, DefaultDecoder]( >> ssc, kafkaParams, Set(topicA, topicB, topicB)) >> .transform{ rdd => >> val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> rdd.mapPartitionsWithIndex( >> (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) => >> offsetRange(idx).topic match { >> case "topicA" => ... >> case "topicB" => ... >> case _ => .... >> } >> ) >> } >> >> 08.09.2015, 19:21, "Cody Koeninger" <c...@koeninger.org>: >>> That doesn't really matter. With the direct stream you'll get all objects >>> for a given topicpartition in the same spark partition. You know what >>> topic it's from via hasOffsetRanges. Then you can deserialize >>> appropriately based on topic. >>> >>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей <alexey.pon...@ya.ru> >>> wrote: >>>> The thing is, that these topics contain absolutely different AVRO >>>> objects(Array[Byte]) that I need to deserialize to different Java(Scala) >>>> objects, filter and then map to tuple (String, String). So i have 3 >>>> streams with different avro object in there. I need to cast them(using >>>> some business rules) to pairs and unite. >>>> >>>> -- >>>> Яндекс.Почта — надёжная почта >>>> http://mail.yandex.ru/neo2/collect/?exp=1&t=1 >>>> >>>> 08.09.2015, 19:11, "Cody Koeninger" <c...@koeninger.org>: >>>> >>>>> I'm not 100% sure what's going on there, but why are you doing a union in >>>>> the first place? >>>>> >>>>> If you want multiple topics in a stream, just pass them all in the set of >>>>> topics to one call to createDirectStream >>>>> >>>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.pon...@ya.ru> >>>>> wrote: >>>>>> Ok. >>>>>> Spark 1.4.1 on yarn >>>>>> >>>>>> Here is my application >>>>>> I have 4 different Kafka topics(different object streams) >>>>>> >>>>>> type Edge = (String,String) >>>>>> >>>>>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( >>>>>> nonEmpty ).map( toEdge ) >>>>>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( >>>>>> nonEmpty ).map( toEdge ) >>>>>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( >>>>>> nonEmpty ).map( toEdge ) >>>>>> >>>>>> val u = a union b union c >>>>>> >>>>>> val source = u.window(Seconds(600), Seconds(10)) >>>>>> >>>>>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( >>>>>> nonEmpty ).map( toEdge ) >>>>>> >>>>>> val joinResult = source.rightOuterJoin( z ) >>>>>> joinResult.foreachRDD { rdd=> >>>>>> rdd.foreachPartition { partition => >>>>>> .... // save to result topic in kafka >>>>>> } >>>>>> } >>>>>> >>>>>> The 'window' function in the code above is constantly growing, >>>>>> no matter how many events appeared in corresponding kafka topics >>>>>> >>>>>> but if I change one line from >>>>>> >>>>>> val source = u.window(Seconds(600), Seconds(10)) >>>>>> >>>>>> to >>>>>> >>>>>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8)) >>>>>> >>>>>> val source = u.transform(_.partitionBy(partitioner.value) >>>>>> ).window(Seconds(600), Seconds(10)) >>>>>> >>>>>> Everything works perfect. >>>>>> >>>>>> Perhaps the problem was in WindowedDStream >>>>>> >>>>>> I forced to use PartitionerAwareUnionRDD( partitionBy the same >>>>>> partitioner ) instead of UnionRDD. >>>>>> >>>>>> Nonetheless I did not see any hints about such a bahaviour in doc. >>>>>> Is it a bug or absolutely normal behaviour? >>>>>> >>>>>> 08.09.2015, 17:03, "Cody Koeninger" <c...@koeninger.org>: >>>>>> >>>>>>> Can you provide more info (what version of spark, code example)? >>>>>>> >>>>>>> On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <alexey.pon...@ya.ru> >>>>>>> wrote: >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have an application with 2 streams, which are joined together. >>>>>>>> Stream1 - is simple DStream(relativly small size batch chunks) >>>>>>>> Stream2 - is a windowed DStream(with duration for example 60 seconds) >>>>>>>> >>>>>>>> Stream1 and Stream2 are Kafka direct stream. >>>>>>>> The problem is that according to logs window operation is constantly >>>>>>>> increasing(<a >>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php">screen</a>). >>>>>>>> And also I see gap in pocessing window(<a >>>>>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php">screen</a>) >>>>>>>> in logs there are no events in that period. >>>>>>>> So what is happen in that gap and why window is constantly insreasing? >>>>>>>> >>>>>>>> Thank you in advance >>>>>>>> >>>>>>>> --------------------------------------------------------------------- >>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org