Hi Praseetha, In order to check if DStream is empty or not, using isEmpty method is correct. I think the problem here is calling input1Pair.lefOuterJoin(input2Pair). I guess input1Pair rdd comes from above transformation. You should do it on DStream instead. In this case, do any transformation with x variable instead. If you use input2Pair rdd a lot, you can consider caching it for better performance.
2016-06-20 19:30 GMT+07:00 Praseetha <[email protected]>: > > Hi Experts, > > I have 2 inputs, where first input is stream (say input1) and the second > one is batch (say input2). I want to figure out if the keys in first input > matches single row or more than one row in the second input. The further > transformations/logic depends on the number of rows matching, whether > single row matches or multiple rows match (for atleast one key in the first > input) > > if(single row matches){ > // do some tranformation > }else{ > // do some transformation > } > > Code that i tried so far > > val input1Pair = streamData.map(x => (x._1, x)) > val input2Pair = input2.map(x => (x._1, x)) > val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)} > val result = joinData.mapValues{ > case(v, Some(a)) => 1L > case(v, None) => 0 > }.reduceByKey(_ + _).filter(_._2 > 1) > > I have done the above coding. When I do result.print, it prints nothing if > all the keys matches only one row in the input2. With the fact that the > DStream may have multiple RDDs, not sure how to figure out if the DStream > is empty or not. > > I tried using foreachRDD, but the streaming app stops abruptly. > > Inside foreachRDD i was performing transformations with other RDDs. like, > > result.foreachRDD{ x=> > > if(x.isEmpty){ > > val out = input1Pair.lefOuterJoin(input2Pair) > > }else{ > > val out = input1Pair.rightOuterJoin(input2Pair) > > } > > Can you please suggest. > > > Regds, > --Praseetha >
