Hi All,

Please check below for the code and input and output, i think the output is
not correct, i  am missing any thing? pls guide 

Code

public class Test {
        private static JavaSparkContext jsc = null;
        private static SQLContext sqlContext = null;
        private static Configuration hadoopConf = null;
        public static void main(String[] args) {
        
                jsc = GlobalSparkContext.getJavaSparkContext();
                sqlContext = GlobalSparkContext.getSQLContext(jsc);

                hadoopConf = new Configuration(jsc.hadoopConfiguration());
        
hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
                
                try {
                        final Emp emp = new Emp();
                        final Dept dept = new Dept();

                        JavaPairRDD<LongWritable, Text> deptinputLines =
jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
Text.class, hadoopConf);
                        JavaRDD<Dept> deptRDD = deptinputLines.map(new
Function<Tuple2&lt;LongWritable, Text>, String>() {
                                                @Override
                                                public String 
call(Tuple2<LongWritable, Text> arg0)     throws Exception {
                                                        return 
arg0._2.toString();
                                                }

                                        }).map(new Function<String, Dept>() {

                                public Dept call(String recordLine) throws 
Exception {
                                        String[] parts = 
recordLine.split(GlobalSparkContext.recordSeparator);
                                        return getInstanceDept(parts, dept);
                                }
                        });

                        DataFrame deptDF = sqlContext.createDataFrame(deptRDD, 
Dept.class);
                        deptDF.registerTempTable("DEPT");
                        //deptDF.show();

                        JavaPairRDD<LongWritable, Text> inputLines =
jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
Text.class, hadoopConf);
                        JavaRDD<Emp> empRDD = inputLines.map(new 
Function<Tuple2&lt;LongWritable,
Text>, String>() {

                                                private static final long 
serialVersionUID = 3371707560417405016L;

                                                @Override
                                                public String 
call(Tuple2<LongWritable, Text> arg0)     throws Exception {
                                                        return 
arg0._2.toString();
                                                }

                                        }).map(new Function<String, Emp>() {

                                private static final long serialVersionUID = 
7656942162815285622L;

                                public Emp call(String recordLine) throws 
Exception {
                                        String[] parts = 
recordLine.split(GlobalSparkContext.recordSeparator);
                                        return getInstance(parts, emp);
                                }
                        });
                        DataFrame empDF = sqlContext.createDataFrame(empRDD, 
Emp.class);
                        empDF.registerTempTable("EMP");

                   sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN DEPT d 
ON e.deptid
= d.deptid").show();

                  
//empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
                        
                }
                catch(Exception e){
                        System.out.println(e);
                }
        }
        public static Emp getInstance(String[] parts, Emp emp) throws
ParseException {
                emp.setId(parts[0]);
                emp.setName(parts[1]);
                emp.setDeptid(parts[2]);
                
                return emp;
        }
        public static Dept getInstanceDept(String[] parts, Dept dept) throws
ParseException {
                dept.setDeptid(parts[0]);
                dept.setDeptname(parts[1]);
                return dept;
        }
}

Input 
Emp
1001 aba 10
1002 abs 20
1003 abd 10
1004 abf 30
1005 abg 10
1006 abh 20
1007 abj 10
1008 abk 30
1009 abl 20
1010 abq 10

Dept
10 dev
20 Test
30 IT

Output
+------+------+----+------+--------+
|deptid|    id|name|deptid|deptname|
+------+------+----+------+--------+
|    10|  1001| aba|    10|     dev|
|    10|  1003| abd|    10|     dev|
|    10|  1005| abg|    10|     dev|
|    10|  1007| abj|    10|     dev|
|    10|  1010| abq|    10|     dev|
|    20|  1002| abs|  null|    null|
|    20|  1006| abh|  null|    null|
|    20|  1009| abl|  null|    null|
|    30|  1004| abf|  null|    null|
|    30|  1008| abk|  null|    null|
+------+------+----+------+--------+



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]

Reply via email to