The thing is, that these topics contain absolutely different AVRO 
objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
objects, filter and then map to tuple (String, String). So i have 3 streams 
with different avro object in there. I need to cast them(using some business 
rules) to pairs and unite.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


08.09.2015, 19:11, "Cody Koeninger" <[email protected]>:
> I'm not 100% sure what's going on there, but why are you doing a union in the 
> first place?
>
> If you want multiple topics in a stream, just pass them all in the set of 
> topics to one call to createDirectStream
>
> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <[email protected]> wrote:
>> Ok.
>> Spark 1.4.1 on yarn
>>
>> Here is my application
>> I have 4 different Kafka topics(different object streams)
>>
>> type Edge = (String,String)
>>
>> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty 
>> ).map( toEdge )
>> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty 
>> ).map( toEdge )
>> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty 
>> ).map( toEdge )
>>
>> val u = a union b union c
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty 
>> ).map( toEdge )
>>
>> val joinResult = source.rightOuterJoin( z )
>> joinResult.foreachRDD { rdd=>
>>   rdd.foreachPartition { partition =>
>>      .... // save to result topic in kafka
>>    }
>>  }
>>
>> The 'window' function in the code above is constantly growing,
>> no matter how many events appeared in corresponding kafka topics
>>
>> but if I change one line from
>>
>> val source = u.window(Seconds(600), Seconds(10))
>>
>> to
>>
>> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
>>
>> val source = u.transform(_.partitionBy(partitioner.value) 
>> ).window(Seconds(600), Seconds(10))
>>
>> Everything works perfect.
>>
>> Perhaps the problem was in WindowedDStream
>>
>> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner ) 
>> instead of UnionRDD.
>>
>> Nonetheless I did not see any hints about such a bahaviour in doc.
>> Is it a bug or absolutely normal behaviour?
>>
>> 08.09.2015, 17:03, "Cody Koeninger" <[email protected]>:
>>
>>>  Can you provide more info (what version of spark, code example)?
>>>
>>>  On Tue, Sep 8, 2015 at 8:18 AM, Alexey Ponkin <[email protected]> wrote:
>>>>  Hi,
>>>>
>>>>  I have an application with 2 streams, which are joined together.
>>>>  Stream1 - is simple DStream(relativly small size batch chunks)
>>>>  Stream2 - is a windowed DStream(with duration for example 60 seconds)
>>>>
>>>>  Stream1 and Stream2 are Kafka direct stream.
>>>>  The problem is that according to logs window operation is constantly 
>>>> increasing(<a 
>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-06-55.php";>screen</a>).
>>>>  And also I see gap in pocessing window(<a 
>>>> href="http://en.zimagez.com/zimage/screenshot2015-09-0816-10-22.php";>screen</a>)
>>>>  in logs there are no events in that period.
>>>>  So what is happen in that gap and why window is constantly insreasing?
>>>>
>>>>  Thank you in advance
>>>>
>>>>  ---------------------------------------------------------------------
>>>>  To unsubscribe, e-mail: [email protected]
>>>>  For additional commands, e-mail: [email protected]

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to