Hi Praseetha,
In order to check if DStream is empty or not, using isEmpty method is
correct. I think the problem here is calling
input1Pair.lefOuterJoin(input2Pair).
I guess input1Pair rdd comes from above transformation. You should do it on
DStream instead. In this case, do any transformation with x variable
instead.
If you use input2Pair rdd a lot, you can consider caching it for better
performance.

2016-06-20 19:30 GMT+07:00 Praseetha <[email protected]>:

>
> Hi Experts,
>
> I have 2 inputs, where first input is stream (say input1) and the second
> one is batch (say input2). I want to figure out if the keys in first input
> matches single row or more than one row in the second input. The further
> transformations/logic depends on the number of rows matching, whether
> single row matches or multiple rows match (for atleast one key in the first
> input)
>
> if(single row matches){
>      // do some tranformation
> }else{
>      // do some transformation
> }
>
> Code that i tried so far
>
> val input1Pair = streamData.map(x => (x._1, x))
> val input2Pair = input2.map(x => (x._1, x))
> val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
> val result = joinData.mapValues{
>     case(v, Some(a)) => 1L
>     case(v, None) => 0
>  }.reduceByKey(_ + _).filter(_._2 > 1)
>
> I have done the above coding. When I do result.print, it prints nothing if
> all the keys matches only one row in the input2. With the fact that the
> DStream may have multiple RDDs, not sure how to figure out if the DStream
> is empty or not.
>
> I tried using foreachRDD, but the streaming app stops abruptly.
>
> Inside foreachRDD i was performing transformations with other RDDs. like,
>
> result.foreachRDD{ x=>
>
> if(x.isEmpty){
>
> val out = input1Pair.lefOuterJoin(input2Pair)
>
> }else{
>
> val out = input1Pair.rightOuterJoin(input2Pair)
>
> }
>
> Can you please suggest.
>
>
> Regds,
> --Praseetha
>

Reply via email to