Hi Experts,

I have 2 inputs, where first input is stream (say input1) and the second
one is batch (say input2). I want to figure out if the keys in first input
matches single row or more than one row in the second input. The further
transformations/logic depends on the number of rows matching, whether
single row matches or multiple rows match (for atleast one key in the first
input)

if(single row matches){
     // do some tranformation
}else{
     // do some transformation
}

Code that i tried so far

val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
    case(v, Some(a)) => 1L
    case(v, None) => 0
 }.reduceByKey(_ + _).filter(_._2 > 1)

I have done the above coding. When I do result.print, it prints nothing if
all the keys matches only one row in the input2. With the fact that the
DStream may have multiple RDDs, not sure how to figure out if the DStream
is empty or not.

I tried using foreachRDD, but the streaming app stops abruptly.

Inside foreachRDD i was performing transformations with other RDDs. like,

result.foreachRDD{ x=>

if(x.isEmpty){

val out = input1Pair.lefOuterJoin(input2Pair)

}else{

val out = input1Pair.rightOuterJoin(input2Pair)

}

Can you please suggest.


Regds,
--Praseetha

Reply via email to