Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by ShravanNarayanamurthy: http://wiki.apache.org/pig/PigFRJoin New page: = Fragment Replicate Join = Fragment Replicate Join(FRJ) is useful when we want a join between a huge table and a very small table (fitting in memory small) and the join doesn't expand the data by much. The idea is to distribute the processing of the huge files by fragmenting it and replicating the small file to all machines receiving a fragment of the huge file. Because of the availability of the entire small file, the join becomes a trivial task without needing any break in the pipeline. == Performance == 1. Query being compared 1. {{{ A = load 'frag'; B = load 'repl' as (replicated/normal based on join type) C = join A by $0, B by $0; }}} 2. {{{ A = load 'frag'; B = load 'repl' as (replicated/normal based on join type) C = join A by $0, B by $0; D = group C by $1; E = foreach D generate group, COUNT(C); }}} 2. Frag & Repl Files 1. Repl File has atmost one tuple per key (shrinking or const size join){{{ 2.1.*.1 Key Set of Frag = Key Set of Repl 2.1.*.2 (Key Set of Frag - Key Set of Repl) != {} 2.1.*.3 (Key Set of Repl - Key Set of Frag) != {} 2.1.#.1 Range of frag files: 20M to 20G with the increment being 10 times the prev one 2.1.$.1 Range of repl files: starts at 2K and ends where the size of repl file is equal to that of frag file with increment being 10 times the prev one }}} 2. Repl File has a lot of tuples per key (expanding join){{{ 2.2.*.1 Key Set of Frag = Key Set of Repl 2.2.*.2 (Key Set of Frag - Key Set of Repl) != {} 2.2.*.3 (Key Set of Repl - Key Set of Frag) != {} 2.2.#.1 Range of frag files: 20M to 20G with the increment being 10 times the prev one 2.2.$.1 Range of repl files: 2K to 2M with the increment being 10 times the prev one }}} 3. Number of map & reduce slots available{{{ 3.1 One preallocated cluster with equal number of map & reduce slots and the same being equal to the maximum number of tasks that will be created by any job that will run on this cluster 3.2 One preallocated cluster with 0 reduce slots and the number of map slots being equal to twice the maximum number of map tasks that will be created by any map-only job that will run on this cluster (Can run only map-only jobs) }}} 4. Number of map tasks created{{{ 4.1 InputSplit logic unchanged 4.2 InputSplit logic changed to produce more map tasks than would have been actually created }}} 5. Number of reduce tasks created{{{ 5.1 number of reduce tasks = number of maps used 5.2 number of reduce tasks is left unassigned which will get set to 0.9 times the max reduce slots configured in the cluster }}} == Experiments == We compare the times of FRJ implemented as a new operator with Symmetric Hash Join (the normal map reduce join) and a UDF implementation of FRJ. The changes to the logical side are as per JoinFramework. We differentiate the joins as those where the join result is larger than its input(Expanding Join) & those where its lesser(Reducing Join). The following graphs show the performance of the various algorithms: === UDF used === {{{ public static class FRJoin extends EvalFunc<DataBag>{ String repl; int keyField; boolean isTblSetUp = false; Hashtable<DataByteArray, DataBag> replTbl = new Hashtable<DataByteArray, DataBag>(); public FRJoin(){ } public FRJoin(String repl){ this.repl = repl; } @Override public DataBag exec(Tuple input) throws IOException { if(!isTblSetUp){ setUpHashTable(); isTblSetUp = true; } try { DataByteArray key = (DataByteArray) input.get(keyField); if(!replTbl.containsKey(key)) return BagFactory.getInstance().newDefaultBag(); return replTbl.get(key); } catch (ExecException e) { throw new IOException(e.getMessage()); } } private void setUpHashTable() throws IOException { FileSpec replFile = new FileSpec(repl,new FuncSpec(PigStorage.class.getName()+"()")); POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false); PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf)); try { pc.connect(); ld.setPc(pc); Tuple dummyTuple = null; for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple)){ Tuple tup = (Tuple) res.result; DataByteArray key = (DataByteArray)tup.get(keyField); DataBag vals = null; if(replTbl.containsKey(key)){ vals = replTbl.get(key); } else{ vals = BagFactory.getInstance().newDefaultBag(); replTbl.put(key, vals); } vals.add(tup); } } catch (ExecException e) { throw new IOException(e.getMessage()); } } } }}}
