Author: zly Date: Thu Feb 9 01:34:12 2017 New Revision: 1782286 URL: http://svn.apache.org/viewvc?rev=1782286&view=rev Log: PIG-4891: Implement FR join by broadcasting small rdd not making more copys of data (Nandor via Liyun)
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Modified: pig/branches/spark/build.xml pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Modified: pig/branches/spark/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1782286&r1=1782285&r2=1782286&view=diff ============================================================================== --- pig/branches/spark/build.xml (original) +++ pig/branches/spark/build.xml Thu Feb 9 01:34:12 2017 @@ -1049,7 +1049,7 @@ <ant dir="${test.e2e.dir}" target="test-tez"/> </target> - <target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in tez mode"> + <target name="test-e2e-spark" depends="jar, piggybank" description="run end-to-end tests in spark mode"> <ant dir="${test.e2e.dir}" target="test-spark"/> </target> Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1782286&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Thu Feb 9 01:34:12 2017 @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.DataType; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.data.SchemaTupleClassGenerator; +import org.apache.pig.data.SchemaTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +import java.util.List; +import java.util.Map; + +public class POFRJoinSpark extends POFRJoin { + private static final Log log = LogFactory.getLog(POFRJoinSpark.class); + + private Map<String, List<Tuple>> broadcasts; + + public POFRJoinSpark(POFRJoin copy) throws ExecException { + super(copy); + } + + @Override + protected void setUpHashMap() throws ExecException { + log.info("Building replication hash table"); + + SchemaTupleFactory[] inputSchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length]; + SchemaTupleFactory[] keySchemaTupleFactories = new SchemaTupleFactory[inputSchemas.length]; + for (int i = 0; i < inputSchemas.length; i++) { + addSchemaToFactories(inputSchemas[i], inputSchemaTupleFactories, i); + addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i); + } + + replicates[fragment] = null; + int i = -1; + long start = System.currentTimeMillis(); + for (int k = 0; k < inputSchemas.length; ++k) { + ++i; + + SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[i]; + SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; + + if (i == fragment) { + replicates[i] = null; + continue; + } + + TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + + log.debug("Completed setup. Trying to build replication hash table"); + List<Tuple> tuples = broadcasts.get(parentPlan.getPredecessors(this).get(i).getOperatorKey().toString()); + + POLocalRearrange localRearrange = LRs[i]; + + for (Tuple t : tuples) { + localRearrange.attachInput(t); + Result res = localRearrange.getNextTuple(); + if (getReporter() != null) { + getReporter().progress(); + } + Tuple tuple = (Tuple) res.result; + if (isKeyNull(tuple.get(1))) continue; + Tuple key = mTupleFactory.newTuple(1); + key.set(0, tuple.get(1)); + Tuple value = getValueTuple(localRearrange, tuple); + + if (replicate.get(key) == null) { + replicate.put(key, new POMergeJoin.TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + } + + replicate.get(key).add(value); + + } + replicates[i] = replicate; + } + long end = System.currentTimeMillis(); + log.debug("Hash Table built. Time taken: " + (end - start)); + } + + @Override + public String name() { + return getAliasString() + "FRJoinSpark[" + DataType.findTypeName(resultType) + + "]" + " - " + mKey.toString(); + } + + private void addSchemaToFactories(Schema schema, SchemaTupleFactory[] schemaTupleFactories, int index) { + if (schema != null) { + log.debug("Using SchemaTuple for FR Join Schema: " + schema); + schemaTupleFactories[index] = SchemaTupleBackend.newSchemaTupleFactory(schema, false, SchemaTupleClassGenerator.GenContext.FR_JOIN); + } + } + + public void attachInputs(Map<String, List<Tuple>> broadcasts) { + this.broadcasts = broadcasts; + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1782286&r1=1782285&r2=1782286&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Thu Feb 9 01:34:12 2017 @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -43,11 +42,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; import org.apache.pig.backend.hadoop.executionengine.spark.operator.POJoinGroupSpark; @@ -56,14 +56,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.tools.pigstats.spark.SparkPigStats; import org.apache.pig.tools.pigstats.spark.SparkStatsUtil; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.broadcast.Broadcast; import org.apache.spark.rdd.RDD; import com.google.common.collect.Lists; @@ -102,7 +100,6 @@ public class JobGraphBuilder extends Spa public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { new PhyPlanSetter(sparkOp.physicalPlan).visit(); try { - setReplicationForFRJoin(sparkOp.physicalPlan); setReplicationForMergeJoin(sparkOp.physicalPlan); sparkOperToRDD(sparkOp); finishUDFs(sparkOp.physicalPlan); @@ -117,26 +114,8 @@ public class JobGraphBuilder extends Spa } } - private void setReplicationForFRJoin(PhysicalPlan plan) throws IOException { - List<Path> filesForMoreReplication = new ArrayList<Path>(); - List<POFRJoin> pofrJoins = PlanHelper.getPhysicalOperators(plan, POFRJoin.class); - if (pofrJoins.size() > 0) { - for (POFRJoin pofrJoin : pofrJoins) { - FileSpec[] fileSpecs = pofrJoin.getReplFiles(); - if (fileSpecs != null) { - for (int i = 0; i < fileSpecs.length; i++) { - if (i != pofrJoin.getFragment()) { - filesForMoreReplication.add(new Path(fileSpecs[i].getFileName())); - } - } - } - } - } - setReplicationForFiles(filesForMoreReplication); - } - private void setReplicationForMergeJoin(PhysicalPlan plan) throws IOException { - List<Path> filesForMoreReplication = new ArrayList<Path>(); + List<Path> filesForMoreReplication = new ArrayList<>(); List<POMergeJoin> poMergeJoins = PlanHelper.getPhysicalOperators(plan, POMergeJoin.class); if (poMergeJoins.size() > 0) { for (POMergeJoin poMergeJoin : poMergeJoins) { @@ -260,7 +239,6 @@ public class JobGraphBuilder extends Spa } } - if (physicalOperator instanceof POSplit) { List<PhysicalPlan> successorPlans = ((POSplit) physicalOperator).getPlans(); for (PhysicalPlan successPlan : successorPlans) { @@ -283,6 +261,11 @@ public class JobGraphBuilder extends Spa + physicalOperator.getClass().getSimpleName() + " " + physicalOperator); List<RDD<Tuple>> allPredRDDs = sortPredecessorRDDs(operatorKeysOfAllPreds); + + if (converter instanceof FRJoinConverter) { + setReplicatedInputs(physicalOperator, (FRJoinConverter) converter); + } + nextRDD = converter.convert(allPredRDDs, physicalOperator); if (nextRDD == null) { @@ -295,6 +278,16 @@ public class JobGraphBuilder extends Spa } } + private void setReplicatedInputs(PhysicalOperator physicalOperator, FRJoinConverter converter) { + Set<String> replicatedInputs = new HashSet<>(); + for (PhysicalOperator operator : physicalOperator.getInputs()) { + if (operator instanceof POBroadcastSpark) { + replicatedInputs.add(((POBroadcastSpark) operator).getBroadcastedVariableName()); + } + } + converter.setReplicatedInputs(replicatedInputs); + } + private List<PhysicalOperator> getPredecessors(PhysicalPlan plan, PhysicalOperator op) { List preds = null; if (!(op instanceof POJoinGroupSpark)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1782286&r1=1782285&r2=1782286&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Thu Feb 9 01:34:12 2017 @@ -51,10 +51,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; @@ -71,6 +72,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.spark.converter.BroadcastConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.CollectedGroupConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.CounterConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.DistinctConverter; @@ -205,10 +207,11 @@ public class SparkLauncher extends Launc convertMap.put(POCounter.class, new CounterConverter()); convertMap.put(PORank.class, new RankConverter()); convertMap.put(POStream.class, new StreamConverter()); - convertMap.put(POFRJoin.class, new FRJoinConverter()); + convertMap.put(POFRJoinSpark.class, new FRJoinConverter()); convertMap.put(POMergeCogroup.class, new MergeCogroupConverter()); convertMap.put(POReduceBySpark.class, new ReduceByConverter()); convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter()); + convertMap.put(POBroadcastSpark.class, new BroadcastConverter(sparkContext)); uploadResources(sparkplan); new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1782286&r1=1782285&r2=1782286&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java Thu Feb 9 01:34:12 2017 @@ -19,48 +19,56 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.Serializable; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; -import scala.Tuple2; -import scala.runtime.AbstractFunction1; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; -import com.google.common.base.Optional; - @SuppressWarnings("serial") public class FRJoinConverter implements RDDConverter<Tuple, Tuple, POFRJoin> { private static final Log LOG = LogFactory.getLog(FRJoinConverter.class); + private Set<String> replicatedInputs; + public RDD<Tuple> convert(List<RDD<Tuple>> predecessors, POFRJoin poFRJoin) throws IOException { - SparkUtil.assertPredecessorSize(predecessors, poFRJoin, 1); + SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1); RDD<Tuple> rdd = predecessors.get(0); + + attachReplicatedInputs((POFRJoinSpark) poFRJoin); + FRJoinFunction frJoinFunction = new FRJoinFunction(poFRJoin); return rdd.toJavaRDD().mapPartitions(frJoinFunction, true).rdd(); } + private void attachReplicatedInputs(POFRJoinSpark poFRJoin) { + Map<String, List<Tuple>> replicatedInputMap = new HashMap<>(); + + for (String replicatedInput : replicatedInputs) { + replicatedInputMap.put(replicatedInput, SparkUtil.getBroadcastedVars().get(replicatedInput).value()); + } + + poFRJoin.attachInputs(replicatedInputMap); + } private static class FRJoinFunction implements FlatMapFunction<Iterator<Tuple>, Tuple>, Serializable { - private POFRJoin poFRJoin; + private POFRJoin poFRJoin; private FRJoinFunction(POFRJoin poFRJoin) { this.poFRJoin = poFRJoin; } @@ -92,5 +100,10 @@ public class FRJoinConverter implements } }; } + + } + + public void setReplicatedInputs(Set<String> replicatedInputs) { + this.replicatedInputs = replicatedInputs; } } \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1782286&r1=1782285&r2=1782286&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Thu Feb 9 01:34:12 2017 @@ -45,10 +45,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoinSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; @@ -691,47 +693,29 @@ public class SparkCompiler extends PhyPl @Override public void visitFRJoin(POFRJoin op) throws VisitorException { - try { - FileSpec[] replFiles = new FileSpec[op.getInputs().size()]; - for (int i = 0; i < replFiles.length; i++) { - if (i == op.getFragment()) continue; - replFiles[i] = getTempFileSpec(); - } - op.setReplFiles(replFiles); - curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment())); + try { + curSparkOp = phyToSparkOpMap.get(op.getInputs().get(op.getFragment())); + for (int i = 0; i < compiledInputs.length; i++) { + SparkOperator sparkOperator = compiledInputs[i]; + if (curSparkOp.equals(sparkOperator)) { + continue; + } - //We create a sparkOperator to save the result of replicated file to the hdfs - //temporary file. We load the temporary file in POFRJoin#setUpHashMap - //More detail see PIG-4771 - for (int i = 0; i < compiledInputs.length; i++) { - SparkOperator sparkOp = compiledInputs[i]; - if (curSparkOp.equals(sparkOp)) { - continue; - } - POStore store = getStore(); - store.setSFile(replFiles[i]); - sparkOp.physicalPlan.addAsLeaf(store); - sparkPlan.connect(sparkOp, curSparkOp); - } + OperatorKey broadcastKey = new OperatorKey(scope, nig.getNextNodeId(scope)); + POBroadcastSpark poBroadcastSpark = new POBroadcastSpark(broadcastKey); + poBroadcastSpark.setBroadcastedVariableName(broadcastKey.toString()); - curSparkOp.physicalPlan.addAsLeaf(op); + sparkOperator.physicalPlan.addAsLeaf(poBroadcastSpark); + } - List<List<PhysicalPlan>> joinPlans = op.getJoinPlans(); - if (joinPlans != null) { - for (List<PhysicalPlan> joinPlan : joinPlans) { - if (joinPlan != null) { - for (PhysicalPlan plan : joinPlan) { - processUDFs(plan); - } - } - } - } - phyToSparkOpMap.put(op, curSparkOp); - } catch (Exception e) { - int errCode = 2034; - String msg = "Error compiling operator " + op.getClass().getSimpleName(); - throw new SparkCompilerException(msg, errCode, PigException.BUG, e); - } + POFRJoinSpark poFRJoinSpark = new POFRJoinSpark(op); + addToPlan(poFRJoinSpark); + phyToSparkOpMap.put(op, curSparkOp); + } catch (Exception e) { + int errCode = 2034; + String msg = "Error compiling operator " + op.getClass().getSimpleName(); + throw new SparkCompilerException(msg, errCode, PigException.BUG, e); + } } @Override