Hi The way I see it, your dedup condition needs to be defined. If you have it variable, then the joining approach is no good either. You may want to stub columns (like putting a default value in the joining clause) to achieve this. If not, you would probably state the problem with all other conditions so we can discuss further?
Getting a partition key upfront will be important in your case to control shuffle. Best Ayan On Sat, Oct 31, 2015 at 11:54 PM, Sarath Chandra < sarathchandra.jos...@algofusiontech.com> wrote: > Thanks for the reply Ayan. > > I got this idea earlier but the problem is the number of columns used for > joining will be varying depending on the some data conditions. Also their > data types will be different. So I'm not getting how to define the UDF as > we need to upfront specify the argument count and their types. > > Any ideas how to tackle this? > > Regards, > Sarath. > > On Sat, Oct 31, 2015 at 4:37 PM, ayan guha <guha.a...@gmail.com> wrote: > >> Can this be a solution? >> >> 1. Write a function which will take a string and convert to md5 hash >> 2. From your base table, generate a string out of all columns you have >> used for joining. So, records 1 and 4 should generate same hash value. >> 3. group by using this new id (you have already linked the records) and >> pull out required fields. >> >> Please let the group know if it works... >> >> Best >> Ayan >> >> On Sat, Oct 31, 2015 at 6:44 PM, Sarath Chandra < >> sarathchandra.jos...@algofusiontech.com> wrote: >> >>> Hi All, >>> >>> I have a hive table where data from 2 different sources (S1 and S2) get >>> accumulated. Sample data below - >>> >>> >>> *RECORD_ID|SOURCE_TYPE|TRN_NO|DATE1|DATE2|BRANCH|REF1|REF2|REF3|REF4|REF5|REF6|DC_FLAG|AMOUNT|CURRENCY* >>> >>> *1|S1|55|19-Oct-2015|19-Oct-2015|25602|999|9999|41106|47311|379|99999|004|999|9999|Cr|2672.000000|INR* >>> >>> *2|S1|55|19-Oct-2015|19-Oct-2015|81201|999|9999|41106|99999|379|99999|004|999|9999|Dr|2672.000000|INR* >>> >>> *3|S2|55|19-OCT-2015|19-OCT-2015|81201|999|9999|41106|99999|379|99999|004|999|9999|DR|2672|INR* >>> >>> *4|S2|55|19-OCT-2015|19-OCT-2015|25602|999|9999|41106|47311|379|99999|004|999|9999|CR|2672|INR* >>> >>> I have a requirement to link similar records (same dates, branch and >>> reference numbers) source wise and assign them unique ID linking the 2 >>> records. For example records 1 and 4 above should be linked with same ID. >>> >>> I've written code below to segregate data source wise and join them >>> based on the similarities. But not knowing how to proceed further. >>> >>> *var hc = new org.apache.spark.sql.hive.HiveContext(sc);* >>> *var src = hc.sql("select >>> RECORD_ID,SOURCE,TRN_NO,DATE1,DATE2,BRANCH,REF1,REF2,REF3,REF4,REF5,REF6,DC_FLAG,AMOUNT,CURRENCY >>> from src_table");* >>> >>> *var s1 = src.filter("source_type='S1'");* >>> >>> *var s2 = src.filter("source_type='S2'");* >>> *var src_join = s1.as <http://s1.as>("S1").join(s2.as >>> <http://s2.as>("S2")).filter("(S1.TRN_NO= S2.TRN_NO) and (S1.DATE1= >>> S2.DATE1) and (S1.DATE2= S2.DATE2) and (S1.BRANCH= S2.BRANCH) and (S1.REF1= >>> S2.REF1) and (S1.REF2= S2.REF2) and (S1.REF3= S2.REF3) and (S1.REF4= >>> S2.REF4) and (S1.REF5= S2.REF5) and (S1.REF6= S2.REF6) and (S1.CURRENCY= >>> S2.CURRENCY)");* >>> >>> Tried using a UDF which returns a random value or hashed string using >>> record IDs of both sides and include it to schema using withColumn, but >>> ended up getting duplicate link IDs. >>> >>> Also when I use a UDF I'm not able to refer to the columns using the >>> alias in next steps. For example if I create a new DF using below line - >>> *var src_link = src_join.as >>> <http://src_join.as>("SJ").withColumn("LINK_ID", >>> linkIDUDF(src_join("S1.RECORD_ID"),src("S2.RECORD_ID")));* >>> Then in further lines I'm not able to refer to "s1" columns from >>> "src_link" like - >>> *var src_link_s1 = src_link.as >>> <http://src_link.as>("SL").select($"S1.RECORD_ID");* >>> >>> Please guide me. >>> >>> Regards, >>> Sarath. >>> >> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha