I have a JavaPairDStream whose RDD looks like to be <"hello", "world">.
I want it to join with a JavaPairRDD which has one item as <"hello",
"spark">.
I expect the joined result to be something like this <"hello", ("world",
"spark")>.
However, I see result to be <"hello", ("world", "world")>.
Is it a bug? Any suggestions will be appreciated very much!

Below is the testing code.
======
        public static void main(String[] args) {

            // Function to make a pair of String
                class StringToPair implements PairFunction<String, String, 
String> {
                        String value_;
                        StringToPair(String value) {
                                value_ = value;
                        }
                        @Override
                        public Tuple2<String, String> call(String arg0) throws 
Exception {
                                return new Tuple2<String, String>(arg0, value_);
                        }
                }

                SparkConf sparkConf = new SparkConf().setAppName("TestJoin");

            JavaSparkContext sc = new JavaSparkContext(sparkConf);

            final JavaPairRDD<String, String> rdd1 =
sc.parallelizePairs(Arrays.asList(new Tuple2<String, String>("hello",
"spark")));
                
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
new
Duration(1000));
                JavaReceiverInputDStream<String> networkevents =
jssc.socketTextStream("localhost", 9999);
                JavaPairDStream<String, String> streamEvents = 
networkevents.mapToPair(new
StringToPair("world"));

                streamEvents.transformToPair(new 
Function<JavaPairRDD&lt;String, String>,
JavaPairRDD<String, String>>() {
                @Override
                public JavaPairRDD<String, String> call(JavaPairRDD<String, 
String> v1)
                                throws Exception {
                        
                        List<Tuple2&lt;String, Tuple2&lt;String, String>>> t =
v1.join(rdd1).collect();
                        System.out.println(t);
                        
                        return v1.join(rdd1).values().mapToPair(new
PairFunction<Tuple2&lt;String, String>, String, String>() {
                                @Override
                                public Tuple2<String, String> 
call(Tuple2<String, String> t)
                                                throws Exception {
                                        return new Tuple2<String, String>(t._2, 
t._1);
                                }
                        });
                }
        }).print();    
                
                jssc.start();           
                jssc.awaitTermination();
        }
======
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n7906/join.png> 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DStream-join-with-a-RDD-v1-0-0-tp7906.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to