Checked in spark-shell with spark 1.5.0
scala> val emmpdat = sc.textFile("empfile");
emmpdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at
textFile at <console>:21
scala> case class EMP (id:Int , name : String , deptId: Int)
defined class EMP
scala> val empdf = emmpdat.map((f) => {val ff=f.split(" ");new
EMP(ff(0).toInt,ff(1),ff(2).toInt)}).toDF
empdf: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int]
scala> empdf.registerTempTable("myemp");
scala>
scala> val deptdat = sc.textFile("deptfile");
deptdat: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[27] at
textFile at <console>:21
scala> case class DEPT (deptId:Int , deptName : String )
defined class DEPT
scala>
scala> val deptdf = deptdat.map((f) => {val ff=f.split(" ");new
DEPT(ff(0).toInt,ff(1))}).toDF
deptdf: org.apache.spark.sql.DataFrame = [deptId: int, deptName: string]
scala> deptdf.registerTempTable("mydept");
scala>
scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid");
16/07/06 21:00:08 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:08 INFO parse.ParseDriver: Parse Completed
res9: org.apache.spark.sql.DataFrame = [id: int, name: string, deptId: int,
deptId: int, deptName: string]
scala> sqlContext.sql("SELECT * FROM myemp e LEFT OUTER JOIN mydept d ON
e.deptid = d.deptid").show
16/07/06 21:00:15 INFO parse.ParseDriver: Parsing command: SELECT * FROM
myemp e LEFT OUTER JOIN mydept d ON e.deptid = d.deptid
16/07/06 21:00:15 INFO parse.ParseDriver: Parse Completed
+----+----+------+------+--------+
| id|name|deptId|deptId|deptName|
+----+----+------+------+--------+
|1001| aba| 10| 10| dev|
|1003| abd| 10| 10| dev|
|1005| abg| 10| 10| dev|
|1007| abj| 10| 10| dev|
|1010| abq| 10| 10| dev|
|1002| abs| 20| 20| Test|
|1006| abh| 20| 20| Test|
|1009| abl| 20| 20| Test|
|1004| abf| 30| 30| IT|
|1008| abk| 30| 30| IT|
+----+----+------+------+--------+
scala>
On Wed, Jul 6, 2016 at 6:30 PM, ayan guha <[email protected]> wrote:
> looks like a data issue to me. Either EMP or DEPT has spaces in dept id
> for deptid=20,30.
>
> Did you check in hive cli?
>
> On Wed, Jul 6, 2016 at 10:33 PM, radha <[email protected]> wrote:
>
>> Hi All,
>>
>> Please check below for the code and input and output, i think the output
>> is
>> not correct, i am missing any thing? pls guide
>>
>> Code
>>
>> public class Test {
>> private static JavaSparkContext jsc = null;
>> private static SQLContext sqlContext = null;
>> private static Configuration hadoopConf = null;
>> public static void main(String[] args) {
>>
>> jsc = GlobalSparkContext.getJavaSparkContext();
>> sqlContext = GlobalSparkContext.getSQLContext(jsc);
>>
>> hadoopConf = new Configuration(jsc.hadoopConfiguration());
>>
>>
>> hadoopConf.set("textinputformat.record.delimiter",GlobalSparkContext.lineSeparator);
>>
>> try {
>> final Emp emp = new Emp();
>> final Dept dept = new Dept();
>>
>> JavaPairRDD<LongWritable, Text> deptinputLines =
>> jsc.newAPIHadoopFile(args[0], TextInputFormat.class,LongWritable.class,
>> Text.class, hadoopConf);
>> JavaRDD<Dept> deptRDD = deptinputLines.map(new
>> Function<Tuple2<LongWritable, Text>, String>() {
>> @Override
>> public String
>> call(Tuple2<LongWritable, Text> arg0) throws Exception {
>> return
>> arg0._2.toString();
>> }
>>
>> }).map(new Function<String,
>> Dept>() {
>>
>> public Dept call(String recordLine)
>> throws Exception {
>> String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>> return getInstanceDept(parts,
>> dept);
>> }
>> });
>>
>> DataFrame deptDF =
>> sqlContext.createDataFrame(deptRDD, Dept.class);
>> deptDF.registerTempTable("DEPT");
>> //deptDF.show();
>>
>> JavaPairRDD<LongWritable, Text> inputLines =
>> jsc.newAPIHadoopFile(args[1], TextInputFormat.class, LongWritable.class,
>> Text.class, hadoopConf);
>> JavaRDD<Emp> empRDD = inputLines.map(new
>> Function<Tuple2<LongWritable,
>> Text>, String>() {
>>
>> private static final long
>> serialVersionUID = 3371707560417405016L;
>>
>> @Override
>> public String
>> call(Tuple2<LongWritable, Text> arg0) throws Exception {
>> return
>> arg0._2.toString();
>> }
>>
>> }).map(new Function<String,
>> Emp>() {
>>
>> private static final long
>> serialVersionUID = 7656942162815285622L;
>>
>> public Emp call(String recordLine) throws
>> Exception {
>> String[] parts =
>> recordLine.split(GlobalSparkContext.recordSeparator);
>> return getInstance(parts, emp);
>> }
>> });
>> DataFrame empDF =
>> sqlContext.createDataFrame(empRDD, Emp.class);
>> empDF.registerTempTable("EMP");
>>
>> sqlContext.sql("SELECT * FROM EMP e LEFT OUTER JOIN
>> DEPT d ON e.deptid
>> = d.deptid").show();
>>
>>
>>
>> //empDF.join(deptDF,empDF.col("deptid").equalTo(deptDF.col("deptid")),"leftouter").show();;
>>
>> }
>> catch(Exception e){
>> System.out.println(e);
>> }
>> }
>> public static Emp getInstance(String[] parts, Emp emp) throws
>> ParseException {
>> emp.setId(parts[0]);
>> emp.setName(parts[1]);
>> emp.setDeptid(parts[2]);
>>
>> return emp;
>> }
>> public static Dept getInstanceDept(String[] parts, Dept dept)
>> throws
>> ParseException {
>> dept.setDeptid(parts[0]);
>> dept.setDeptname(parts[1]);
>> return dept;
>> }
>> }
>>
>> Input
>> Emp
>> 1001 aba 10
>> 1002 abs 20
>> 1003 abd 10
>> 1004 abf 30
>> 1005 abg 10
>> 1006 abh 20
>> 1007 abj 10
>> 1008 abk 30
>> 1009 abl 20
>> 1010 abq 10
>>
>> Dept
>> 10 dev
>> 20 Test
>> 30 IT
>>
>> Output
>> +------+------+----+------+--------+
>> |deptid| id|name|deptid|deptname|
>> +------+------+----+------+--------+
>> | 10| 1001| aba| 10| dev|
>> | 10| 1003| abd| 10| dev|
>> | 10| 1005| abg| 10| dev|
>> | 10| 1007| abj| 10| dev|
>> | 10| 1010| abq| 10| dev|
>> | 20| 1002| abs| null| null|
>> | 20| 1006| abh| null| null|
>> | 20| 1009| abl| null| null|
>> | 30| 1004| abf| null| null|
>> | 30| 1008| abk| null| null|
>> +------+------+----+------+--------+
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Left-outer-Join-issue-using-programmatic-sql-joins-tp27295.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [email protected]
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>