Checked in spark-shell with spark 1.5.0

scala> val emmpdat = sc.textFile("empfile");
emmpdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at
textFile at <console>:21

scala> case class EMP (id:Int , name : String , deptId: Int)
defined class EMP

scala> val empdf = emmpdat.map((f) => {val ff=f.split(" ");new
EMP(ff(0).toInt,ff(1),ff(2).toInt)}).toDF
empdf: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int]

scala> empdf.registerTempTable("myemp");

scala>

scala> val deptdat = sc.textFile("deptfile");
deptdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[27] at
textFile at <console>:21

scala> case class DEPT (deptId:Int , deptName : String )
defined class DEPT

scala>

scala> val deptdf = deptdat.map((f) => {val ff=f.split(" ");new
DEPT(ff(0).toInt,ff(1))}).toDF
deptdf: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string]

scala> deptdf.registerTempTable("mydept");

scala>

scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid");
16/07/06 21:00:08 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:08 INFO parse.ParseDriver: Parse Completed
res9: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int,
deptId: int, deptName: string]

scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid").show
16/07/06 21:00:15 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:15 INFO parse.ParseDriver: Parse Completed
+----+----+------+------+--------+
|  id|name|deptId|deptId|deptName|
+----+----+------+------+--------+
|1001| aba|    10|    10|     dev|
|1003| abd|    10|    10|     dev|
|1005| abg|    10|    10|     dev|
|1007| abj|    10|    10|     dev|
|1010| abq|    10|    10|     dev|
|1002| abs|    20|    20|    Test|
|1006| abh|    20|    20|    Test|
|1009| abl|    20|    20|    Test|
|1004| abf|    30|    30|      IT|
|1008| abk|    30|    30|      IT|
+----+----+------+------+--------+


scala>



On Wed, Jul 6, 2016 at 6:30 PM, ayan guha <[email protected]> wrote:

> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
> for deptid=20,30.
>
> Did you check in hive cli?
>
> On Wed, Jul 6, 2016 at 10:33 PM, radha <[email protected]> wrote:
>
>> Hi All,
>>
>> Please check below for the code and input and output, i think the output
>> is
>> not correct, i  am missing any thing? pls guide
>>
>> Code
>>
>> public class Test {
>>         private static JavaSparkContext jsc = null;
>>         private static SQLContext sqlContext = null;
>>         private static Configuration hadoopConf = null;
>>         public static void main(String[] args) {
>>
>>                 jsc = GlobalSparkContext.getJavaSparkContext();
>>                 sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>
>>                 hadoopConf = new Configuration(jsc.hadoopConfiguration());
>>
>>
>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>
>>                 try {
>>                         final Emp emp = new Emp();
>>                         final Dept dept = new Dept();
>>
>>                         JavaPairRDD<LongWritable, Text> deptinputLines =
>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>> Text.class, hadoopConf);
>>                         JavaRDD<Dept> deptRDD = deptinputLines.map(new
>> Function<Tuple2&lt;LongWritable, Text>, String>() {
>>                                                 @Override
>>                                                 public String
>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>                                                         return
>> arg0._2.toString();
>>                                                 }
>>
>>                                         }).map(new Function<String,
>> Dept>() {
>>
>>                                 public Dept call(String recordLine)
>> throws Exception {
>>                                         String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>>                                         return getInstanceDept(parts,
>> dept);
>>                                 }
>>                         });
>>
>>                         DataFrame deptDF =
>> sqlContext.createDataFrame(deptRDD, Dept.class);
>>                         deptDF.registerTempTable("DEPT");
>>                         //deptDF.show();
>>
>>                         JavaPairRDD<LongWritable, Text> inputLines =
>> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
>> Text.class, hadoopConf);
>>                         JavaRDD<Emp> empRDD = inputLines.map(new
>> Function<Tuple2&lt;LongWritable,
>> Text>, String>() {
>>
>>                                                 private static final long
>> serialVersionUID = 3371707560417405016L;
>>
>>                                                 @Override
>>                                                 public String
>> call(Tuple2<LongWritable, Text> arg0)     throws Exception {
>>                                                         return
>> arg0._2.toString();
>>                                                 }
>>
>>                                         }).map(new Function<String,
>> Emp>() {
>>
>>                                 private static final long
>> serialVersionUID = 7656942162815285622L;
>>
>>                                 public Emp call(String recordLine) throws
>> Exception {
>>                                         String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>>                                         return getInstance(parts, emp);
>>                                 }
>>                         });
>>                         DataFrame empDF =
>> sqlContext.createDataFrame(empRDD, Emp.class);
>>                         empDF.registerTempTable("EMP");
>>
>>                    sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
>> DEPT d ON e.deptid
>> = d.deptid").show();
>>
>>
>>
>> //empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
>>
>>                 }
>>                 catch(Exception e){
>>                         System.out.println(e);
>>                 }
>>         }
>>         public static Emp getInstance(String[] parts, Emp emp) throws
>> ParseException {
>>                 emp.setId(parts[0]);
>>                 emp.setName(parts[1]);
>>                 emp.setDeptid(parts[2]);
>>
>>                 return emp;
>>         }
>>         public static Dept getInstanceDept(String[] parts, Dept dept)
>> throws
>> ParseException {
>>                 dept.setDeptid(parts[0]);
>>                 dept.setDeptname(parts[1]);
>>                 return dept;
>>         }
>> }
>>
>> Input
>> Emp
>> 1001 aba 10
>> 1002 abs 20
>> 1003 abd 10
>> 1004 abf 30
>> 1005 abg 10
>> 1006 abh 20
>> 1007 abj 10
>> 1008 abk 30
>> 1009 abl 20
>> 1010 abq 10
>>
>> Dept
>> 10 dev
>> 20 Test
>> 30 IT
>>
>> Output
>> +------+------+----+------+--------+
>> |deptid|    id|name|deptid|deptname|
>> +------+------+----+------+--------+
>> |    10|  1001| aba|    10|     dev|
>> |    10|  1003| abd|    10|     dev|
>> |    10|  1005| abg|    10|     dev|
>> |    10|  1007| abj|    10|     dev|
>> |    10|  1010| abq|    10|     dev|
>> |    20|  1002| abs|  null|    null|
>> |    20|  1006| abh|  null|    null|
>> |    20|  1009| abl|  null|    null|
>> |    30|  1004| abf|  null|    null|
>> |    30|  1008| abk|  null|    null|
>> +------+------+----+------+--------+
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [email protected]
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to