I'm not 100% sure what's going on there, but why are you doing a union in
the first place?

If you want multiple topics in a stream, just pass them all in the set of
topics to one call to createDirectStream

On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin <alexey.pon...@ya.ru> wrote:

> Ok.
> Spark 1.4.1 on yarn
> Here is my application
> I have 4 different Kafka topics(different object streams)
> type Edge = (String,String)
> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( nonEmpty
> ).map( toEdge )
> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( nonEmpty
> ).map( toEdge )
> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( nonEmpty
> ).map( toEdge )
> val u = a union b union c
> val source = u.window(Seconds(600), Seconds(10))
> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( nonEmpty
> ).map( toEdge )
> val joinResult = source.rightOuterJoin( z )
> joinResult.foreachRDD { rdd=>
>   rdd.foreachPartition { partition =>
>      .... // save to result topic in kafka
>    }
>  }
> The 'window' function in the code above is constantly growing,
> no matter how many events appeared in corresponding kafka topics
> but if I change one line from
> val source = u.window(Seconds(600), Seconds(10))
> to
> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> Everything works perfect.
> Perhaps the problem was in WindowedDStream
> I forced to use PartitionerAwareUnionRDD( partitionBy the same partitioner
> ) instead of UnionRDD.
> Nonetheless I did not see any hints about such a bahaviour in doc.
> Is it a bug or absolutely normal behaviour?
