Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java Mon May 29 15:00:39 2017 @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.Operator; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.MultiMap; + +/** + * An operator model for a Spark job. Acts as a host to the plans that will + * execute in spark. + */ +public class SparkOperator extends Operator<SparkOpPlanVisitor> { + private static enum OPER_FEATURE { + NONE, + // Indicate if this job is a sampling job + SAMPLER, + // Indicate if this job is a merge indexer + INDEXER, + // Indicate if this job is a group by job + GROUPBY, + // Indicate if this job is a cogroup job + COGROUP, + // Indicate if this job is a regular join job + HASHJOIN, + // Indicate if this job is a union job + UNION, + // Indicate if this job is a native job + NATIVE, + // Indicate if this job is a limit job + LIMIT, + // Indicate if this job is a limit job after sort + LIMIT_AFTER_SORT; + }; + + public PhysicalPlan physicalPlan; + + public Set<String> UDFs; + + /* Name of the Custom Partitioner used */ + public String customPartitioner = null; + + public Set<PhysicalOperator> scalars; + + public int requestedParallelism = -1; + + private BitSet feature = new BitSet(); + + private boolean splitter = false; + + // Name of the partition file generated by sampling process, + // Used by Skewed Join + private String skewedJoinPartitionFile; + + private boolean usingTypedComparator = false; + + private boolean combineSmallSplits = true; + + private List<String> crossKeys = null; + + private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap = new MultiMap<OperatorKey, OperatorKey>(); + + // Indicates if a UDF comparator is used + boolean isUDFComparatorUsed = false; + + //The quantiles file name if globalSort is true + private String quantFile; + + //Indicates if this job is an order by job + private boolean globalSort = false; + + public SparkOperator(OperatorKey k) { + super(k); + physicalPlan = new PhysicalPlan(); + UDFs = new HashSet<String>(); + scalars = new HashSet<PhysicalOperator>(); + } + + @Override + public boolean supportsMultipleInputs() { + return true; + } + + @Override + public boolean supportsMultipleOutputs() { + return true; + } + + @Override + public String name() { + String udfStr = getUDFsAsStr(); + StringBuilder sb = new StringBuilder("Spark" + "(" + + requestedParallelism + (udfStr.equals("") ? "" : ",") + + udfStr + ")" + " - " + mKey.toString()); + return sb.toString(); + } + + private String getUDFsAsStr() { + StringBuilder sb = new StringBuilder(); + if (UDFs != null && UDFs.size() > 0) { + for (String str : UDFs) { + sb.append(str.substring(str.lastIndexOf('.') + 1)); + sb.append(','); + } + sb.deleteCharAt(sb.length() - 1); + } + return sb.toString(); + } + + public void add(PhysicalOperator physicalOper) { + this.physicalPlan.add(physicalOper); + } + + @Override + public void visit(SparkOpPlanVisitor v) throws VisitorException { + v.visitSparkOp(this); + } + + public void addCrossKey(String key) { + if (crossKeys == null) { + crossKeys = new ArrayList<String>(); + } + crossKeys.add(key); + } + + public List<String> getCrossKeys() { + return crossKeys; + } + + public boolean isGroupBy() { + return feature.get(OPER_FEATURE.GROUPBY.ordinal()); + } + + public void markGroupBy() { + feature.set(OPER_FEATURE.GROUPBY.ordinal()); + } + + public boolean isCogroup() { + return feature.get(OPER_FEATURE.COGROUP.ordinal()); + } + + public void markCogroup() { + feature.set(OPER_FEATURE.COGROUP.ordinal()); + } + + public boolean isRegularJoin() { + return feature.get(OPER_FEATURE.HASHJOIN.ordinal()); + } + + public void markRegularJoin() { + feature.set(OPER_FEATURE.HASHJOIN.ordinal()); + } + + public int getRequestedParallelism() { + return requestedParallelism; + } + + public void setSplitter(boolean spl) { + splitter = spl; + } + + public boolean isSplitter() { + return splitter; + } + + public boolean isSampler() { + return feature.get(OPER_FEATURE.SAMPLER.ordinal()); + } + + public void markSampler() { + feature.set(OPER_FEATURE.SAMPLER.ordinal()); + } + + public void setSkewedJoinPartitionFile(String file) { + skewedJoinPartitionFile = file; + } + + public String getSkewedJoinPartitionFile() { + return skewedJoinPartitionFile; + } + + protected boolean usingTypedComparator() { + return usingTypedComparator; + } + + protected void useTypedComparator(boolean useTypedComparator) { + this.usingTypedComparator = useTypedComparator; + } + + protected void noCombineSmallSplits() { + combineSmallSplits = false; + } + + public boolean combineSmallSplits() { + return combineSmallSplits; + } + + public boolean isIndexer() { + return feature.get(OPER_FEATURE.INDEXER.ordinal()); + } + + public void markIndexer() { + feature.set(OPER_FEATURE.INDEXER.ordinal()); + } + public boolean isUnion() { + return feature.get(OPER_FEATURE.UNION.ordinal()); + } + + public void markUnion() { + feature.set(OPER_FEATURE.UNION.ordinal()); + } + + public boolean isNative() { + return feature.get(OPER_FEATURE.NATIVE.ordinal()); + } + + public void markNative() { + feature.set(OPER_FEATURE.NATIVE.ordinal()); + } + + public boolean isLimit() { + return feature.get(OPER_FEATURE.LIMIT.ordinal()); + } + + public void markLimit() { + feature.set(OPER_FEATURE.LIMIT.ordinal()); + } + + public boolean isLimitAfterSort() { + return feature.get(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal()); + } + + public void markLimitAfterSort() { + feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal()); + } + + public void copyFeatures(SparkOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { + for (OPER_FEATURE opf : OPER_FEATURE.values()) { + if (excludeFeatures != null && excludeFeatures.contains(opf)) { + continue; + } + if (copyFrom.feature.get(opf.ordinal())) { + feature.set(opf.ordinal()); + } + } + } + + public boolean isSkewedJoin() { + return (skewedJoinPartitionFile != null); + } + + public void setRequestedParallelism(int requestedParallelism) { + this.requestedParallelism = requestedParallelism; + } + + public void setRequestedParallelismByReference(SparkOperator oper) { + this.requestedParallelism = oper.requestedParallelism; + } + + //If enable multiquery optimizer, in some cases, the predecessor(from) of a physicalOp(to) will be the leaf physicalOperator of + //previous sparkOperator.More detail see PIG-4675 + public void addMultiQueryOptimizeConnectionItem(OperatorKey to, OperatorKey from) { + multiQueryOptimizeConnectionMap.put(to, from); + } + + public MultiMap<OperatorKey, OperatorKey> getMultiQueryOptimizeConnectionItem() { + return multiQueryOptimizeConnectionMap; + } + + public void setGlobalSort(boolean globalSort) { + this.globalSort = globalSort; + } + + public boolean isGlobalSort() { + return globalSort; + } + +}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Mon May 29 15:00:39 2017 @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.plan.optimizer.OptimizerException; +import org.apache.pig.impl.util.Pair; + +/** + * This visitor visits the SparkPlan and does the following for each + * SparkOperator - visits the POPackage in the plan and finds the corresponding + * POLocalRearrange(s). It then annotates the POPackage with information about + * which columns in the "value" are present in the "key" and will need to + * stitched in to the "value" + */ +public class SparkPOPackageAnnotator extends SparkOpPlanVisitor { + private static final Log LOG = LogFactory.getLog(SparkPOPackageAnnotator.class); + + public SparkPOPackageAnnotator(SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + if (!sparkOp.physicalPlan.isEmpty()) { + PackageDiscoverer pkgDiscoverer = new PackageDiscoverer( + sparkOp.physicalPlan); + pkgDiscoverer.visit(); + } + } + + static class PackageDiscoverer extends PhyPlanVisitor { + private POPackage pkg; + private PhysicalPlan plan; + + public PackageDiscoverer(PhysicalPlan plan) { + super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( + plan)); + this.plan = plan; + } + + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + this.pkg = pkg; + + // Find POLocalRearrange(s) corresponding to this POPackage + PhysicalOperator graOp = plan.getPredecessors(pkg).get(0); + if (! (graOp instanceof POGlobalRearrange)) { + throw new OptimizerException("Package operator is not preceded by " + + "GlobalRearrange operator in Spark Plan", 2087, PigException.BUG); + } + + List<PhysicalOperator> lraOps = plan.getPredecessors(graOp); + if (pkg.getNumInps() != lraOps.size()) { + throw new OptimizerException("Unexpected problem during optimization. " + + "Could not find all LocalRearrange operators. Expected " + pkg.getNumInps() + + ". Got " + lraOps.size() + ".", 2086, PigException.BUG); + } + Collections.sort(lraOps); + for (PhysicalOperator op : lraOps) { + if (! (op instanceof POLocalRearrange)) { + throw new OptimizerException("GlobalRearrange operator can only be preceded by " + + "LocalRearrange operator(s) in Spark Plan", 2087, PigException.BUG); + } + annotatePkgWithLRA((POLocalRearrange)op); + } + }; + + private void annotatePkgWithLRA(POLocalRearrange lrearrange) + throws VisitorException { + + Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo; + if (LOG.isDebugEnabled()) + LOG.debug("Annotating package " + pkg + " with localrearrange operator " + + lrearrange + " with index " + lrearrange.getIndex()); + + if (pkg.getPkgr() instanceof LitePackager) { + if (lrearrange.getIndex() != 0) { + throw new VisitorException( + "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " + + lrearrange.getIndex()); + } + } + + // annotate the package with information from the LORearrange + // update the keyInfo information if already present in the + // POPackage + keyInfo = pkg.getPkgr().getKeyInfo(); + if (keyInfo == null) + keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); + + if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + int errCode = 2087; + String msg = "Unexpected problem during optimization." + + " Found index:" + lrearrange.getIndex() + + " in multiple LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + + } + keyInfo.put( + Integer.valueOf(lrearrange.getIndex()), + new Pair<Boolean, Map<Integer, Integer>>(lrearrange + .isProjectStar(), lrearrange.getProjectedColsMap())); + if (LOG.isDebugEnabled()) + LOG.debug("KeyInfo for packager for package operator " + pkg + " is " + + keyInfo ); + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + } + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.io.PrintStream; +import java.util.List; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +/** + * A visitor mechanism printing out the logical plan. + */ +public class SparkPrinter extends SparkOpPlanVisitor { + + private PrintStream mStream = null; + private boolean isVerbose = true; + + public SparkPrinter(PrintStream ps, SparkOperPlan plan) { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + mStream = ps; + mStream.println("#--------------------------------------------------"); + mStream.println("# Spark Plan"); + mStream.println("#--------------------------------------------------"); + } + + public void setVerbose(boolean verbose) { + isVerbose = verbose; + } + + @Override + public void visitSparkOp(SparkOperator sparkOp) throws VisitorException { + mStream.println(""); + mStream.println("Spark node " + sparkOp.getOperatorKey().toString()); + if (sparkOp instanceof NativeSparkOperator) { + mStream.println(((NativeSparkOperator)sparkOp).getCommandString()); + mStream.println("--------"); + mStream.println(); + return; + } + if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) { + PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new PlanPrinter<PhysicalOperator, PhysicalPlan>( + sparkOp.physicalPlan, mStream); + printer.setVerbose(isVerbose); + printer.visit(); + mStream.println("--------"); + } + List<POGlobalRearrangeSpark> glrList = PlanHelper.getPhysicalOperators(sparkOp.physicalPlan, POGlobalRearrangeSpark.class); + for (POGlobalRearrangeSpark glr : glrList) { + if (glr.isUseSecondaryKey()) { + mStream.println("POGlobalRearrange(" + glr.getOperatorKey() + ") uses secondaryKey"); + } + } + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.plan; + +import java.io.PrintStream; +import java.io.StringWriter; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.XMLPhysicalPlanPrinter; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import static javax.xml.transform.OutputKeys.INDENT; +import static javax.xml.transform.OutputKeys.OMIT_XML_DECLARATION; + + +public class XMLSparkPrinter extends SparkOpPlanVisitor { + + private PrintStream mStream = null; + + private Document doc = null; + private Element root = null; + + public XMLSparkPrinter(PrintStream ps, SparkOperPlan plan) throws ParserConfigurationException { + super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan)); + mStream = ps; + DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + DocumentBuilder builder = factory.newDocumentBuilder(); + this.doc = builder.newDocument(); + this.root = this.doc.createElement("sparkPlan"); + this.doc.appendChild(this.root); + + } + + + public void closePlan() throws TransformerException { + TransformerFactory factory = TransformerFactory.newInstance(); + Transformer transformer = factory.newTransformer(); + transformer.setOutputProperty(OMIT_XML_DECLARATION, "yes"); + transformer.setOutputProperty(INDENT, "yes"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2"); + + StringWriter sw = new StringWriter(); + StreamResult result = new StreamResult(sw); + DOMSource source = new DOMSource(doc); + transformer.transform(source, result); + mStream.println(sw.toString()); + } + + @Override + public void visitSparkOp(SparkOperator so) throws VisitorException { + Element sparkNode = doc.createElement("sparkNode"); + sparkNode.setAttribute("scope", "" + so.getOperatorKey().id); + if(so instanceof NativeSparkOperator) { + Element nativeSparkOper = doc.createElement("nativeSpark"); + nativeSparkOper.setTextContent(((NativeSparkOperator)so).getCommandString()); + sparkNode.appendChild(nativeSparkOper); + root.appendChild(sparkNode); + return; + } + + if (so.physicalPlan != null && so.physicalPlan.size() > 0) { + XMLPhysicalPlanPrinter<PhysicalPlan> printer = new XMLPhysicalPlanPrinter<>(so.physicalPlan, doc, sparkNode); + printer.visit(); + } + + root.appendChild(sparkNode); + + } + +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.running; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigRecordReader; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigSplit; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; +import org.apache.pig.data.SchemaTupleBackend; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.spark.SparkCounters; + +public class PigInputFormatSpark extends PigInputFormat { + + @Override + public RecordReader<Text, Tuple> createRecordReader(InputSplit split, TaskAttemptContext context) throws + IOException, InterruptedException { + resetUDFContext(); + //PigSplit#conf is the default hadoop configuration, we need get the configuration + //from context.getConfigration() to retrieve pig properties + PigSplit pigSplit = ((SparkPigSplit) split).getWrappedPigSplit(); + Configuration conf = context.getConfiguration(); + pigSplit.setConf(conf); + //Set current splitIndex in PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX) + //which will be used in POMergeCogroup#setup + if (PigMapReduce.sJobContext == null) { + PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new JobID()); + } + PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX, pigSplit.getSplitIndex()); + // Here JobConf is first available in spark Executor thread, we initialize PigContext,UDFContext and + // SchemaTupleBackend by reading properties from JobConf + initialize(conf); + + SparkRecordReaderFactory sparkRecordReaderFactory = new SparkRecordReaderFactory(pigSplit, context); + return sparkRecordReaderFactory.createRecordReader(); + } + + /** + * This is where we have to wrap PigSplits into SparkPigSplits + * @param jobcontext + * @return + * @throws IOException + * @throws InterruptedException + */ + @Override + public List<InputSplit> getSplits(JobContext jobcontext) throws IOException, InterruptedException { + List<InputSplit> sparkPigSplits = new ArrayList<>(); + List<InputSplit> originalSplits = super.getSplits(jobcontext); + + boolean isFileSplits = true; + for (InputSplit inputSplit : originalSplits) { + PigSplit split = (PigSplit)inputSplit; + if (!(split.getWrappedSplit() instanceof FileSplit)) { + isFileSplits = false; + break; + } + } + + for (InputSplit inputSplit : originalSplits) { + PigSplit split = (PigSplit) inputSplit; + if (!isFileSplits) { + sparkPigSplits.add(new SparkPigSplit.GenericSparkPigSplit(split)); + } else { + sparkPigSplits.add(new SparkPigSplit.FileSparkPigSplit(split)); + } + } + + return sparkPigSplits; + } + + private void initialize(Configuration jobConf) throws IOException { + MapRedUtil.setupUDFContext(jobConf); + PigContext pc = (PigContext) ObjectSerializer.deserialize(jobConf.get("pig.pigContext")); + SchemaTupleBackend.initialize(jobConf, pc); + PigMapReduce.sJobConfInternal.set(jobConf); + PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); + pigHadoopLogger.setAggregate("true".equalsIgnoreCase(jobConf.get("aggregate.warning"))); + pigHadoopLogger.setReporter((SparkCounters)ObjectSerializer.deserialize(jobConf.get("pig.spark.counters"))); + PhysicalOperator.setPigLogger(pigHadoopLogger); + } + + private void resetUDFContext() { + UDFContext.getUDFContext().reset(); + } + + + static class SparkRecordReaderFactory extends PigInputFormat.RecordReaderFactory { + + public SparkRecordReaderFactory(InputSplit split, TaskAttemptContext context) throws IOException { + super(split, context); + } + + @Override + public RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException { + return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc, context, limit); + } + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java Mon May 29 15:00:39 2017 @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.spark.streaming; + +import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager; +import org.apache.spark.TaskContext; + +public class SparkExecutableManager extends HadoopExecutableManager { + @Override + protected boolean writeErrorToHDFS(int limit, String taskId) { + if (command.getPersistStderr()) { + int tipId = TaskContext.get().attemptNumber(); + return tipId < command.getLogFilesLimit(); + } + return false; + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java Mon May 29 15:00:39 2017 @@ -37,6 +37,6 @@ public class AccumulatorOptimizer extend @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { - AccumulatorOptimizerUtil.addAccumulator(tezOp.plan); + AccumulatorOptimizerUtil.addAccumulator(tezOp.plan, tezOp.plan.getRoots()); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Mon May 29 15:00:39 2017 @@ -102,7 +102,8 @@ public class SecondaryKeyOptimizerTez ex rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey()); } - SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan); + SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new SecondaryKeyOptimizerUtil(); + SecondaryKeyOptimizerInfo info = secondaryKeyOptUtil.applySecondaryKeySort(rearrangePlan, to.plan); if (info != null) { numSortRemoved += info.getNumSortRemoved(); numDistinctChanged += info.getNumDistinctChanged(); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Mon May 29 15:00:39 2017 @@ -37,11 +37,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.VisitorException; public class AccumulatorOptimizerUtil { private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class); @@ -57,9 +61,8 @@ public class AccumulatorOptimizerUtil { return batchSize; } - public static void addAccumulator(PhysicalPlan plan) { + public static void addAccumulator(PhysicalPlan plan, List<PhysicalOperator> pos) { // See if this is a map-reduce job - List<PhysicalOperator> pos = plan.getRoots(); if (pos == null || pos.size() == 0) { return; } @@ -286,4 +289,4 @@ public class AccumulatorOptimizerUtil { return false; } -} +} \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Mon May 29 15:00:39 2017 @@ -333,7 +333,7 @@ public class CombinerOptimizerUtil { * @return null if plan is not combinable, otherwise list of combinable operators * @throws VisitorException */ - private static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners) + public static List<Pair<PhysicalOperator, PhysicalPlan>> findAlgebraicOps(List<PhysicalPlan> feInners) throws VisitorException { List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = Lists.newArrayList(); @@ -447,7 +447,7 @@ public class CombinerOptimizerUtil { * @param keyType type for group-by key * @return new POForeach */ - private static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) { + public static POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) { String scope = foreach.getOperatorKey().scope; POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>()); newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations()); @@ -471,7 +471,7 @@ public class CombinerOptimizerUtil { * @throws CloneNotSupportedException * @throws PlanException */ - private static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan) + public static PhysicalPlan createPlanWithPredecessors(PhysicalOperator algeOp, PhysicalPlan pplan) throws CloneNotSupportedException, PlanException { PhysicalPlan newplan = new PhysicalPlan(); addPredecessorsToPlan(algeOp, pplan, newplan); @@ -508,7 +508,7 @@ public class CombinerOptimizerUtil { * @throws CloneNotSupportedException * @throws PlanException */ - private static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos) + public static void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos) throws CloneNotSupportedException, PlanException { // an array that we will first populate with physical operators in order // of their position in input. Used while adding plans to combine @@ -578,7 +578,7 @@ public class CombinerOptimizerUtil { * @param rearrange * @return */ - private static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) { + public static POPreCombinerLocalRearrange getPreCombinerLR(POLocalRearrange rearrange) { String scope = rearrange.getOperatorKey().scope; POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange( createOperatorKey(scope), @@ -587,7 +587,7 @@ public class CombinerOptimizerUtil { return pclr; } - private static OperatorKey createOperatorKey(String scope) { + public static OperatorKey createOperatorKey(String scope) { return new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope)); } @@ -619,7 +619,7 @@ public class CombinerOptimizerUtil { * @param type * @throws PlanException */ - private static void changeFunc(POForEach fe, byte type) throws PlanException { + public static void changeFunc(POForEach fe, byte type) throws PlanException { for (PhysicalPlan plan : fe.getInputPlans()) { List<PhysicalOperator> leaves = plan.getLeaves(); if (leaves == null || leaves.size() != 1) { @@ -657,7 +657,7 @@ public class CombinerOptimizerUtil { * @throws PlanException * @throws CloneNotSupportedException */ - private static POLocalRearrange getNewRearrange(POLocalRearrange rearrange) + public static POLocalRearrange getNewRearrange(POLocalRearrange rearrange) throws PlanException, CloneNotSupportedException { POLocalRearrange newRearrange = rearrange.clone(); @@ -835,7 +835,7 @@ public class CombinerOptimizerUtil { * with * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag] */ - private static class DistinctPatcher extends PhyPlanVisitor { + public static class DistinctPatcher extends PhyPlanVisitor { private POUserFunc distinct = null; /** * @param plan @@ -901,12 +901,12 @@ public class CombinerOptimizerUtil { } } - POUserFunc getDistinct() { + public POUserFunc getDistinct() { return distinct; } } - private static class fixMapProjects extends PhyPlanVisitor { + public static class fixMapProjects extends PhyPlanVisitor { public fixMapProjects(PhysicalPlan plan) { this(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java Mon May 29 15:00:39 2017 @@ -35,12 +35,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.data.DataType; import org.apache.pig.impl.io.PigNullableWritable; @@ -54,7 +57,7 @@ import org.apache.pig.impl.plan.VisitorE public class SecondaryKeyOptimizerUtil { private static Log log = LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName()); - private SecondaryKeyOptimizerUtil() { + public SecondaryKeyOptimizerUtil() { } @@ -182,7 +185,7 @@ public class SecondaryKeyOptimizerUtil { return result; } - public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException { + public SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan mapPlan, PhysicalPlan reducePlan) throws VisitorException { log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort"); SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new SecondaryKeyOptimizerInfo(); List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>(); @@ -241,14 +244,11 @@ public class SecondaryKeyOptimizerUtil { } PhysicalOperator root = reduceRoots.get(0); - if (!(root instanceof POPackage)) { - log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing"); - return null; - } + PhysicalOperator currentNode = getCurrentNode(root,reducePlan); // visit the POForEach of the reduce plan. We can have Limit and Filter // in the middle - PhysicalOperator currentNode = root; + POForEach foreach = null; while (currentNode != null) { if (currentNode instanceof POPackage @@ -402,12 +402,33 @@ public class SecondaryKeyOptimizerUtil { throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode); } } - POPackage pack = (POPackage) root; - pack.getPkgr().setUseSecondaryKey(true); + + if (root instanceof POGlobalRearrangeSpark) { + POGlobalRearrangeSpark plg = (POGlobalRearrangeSpark) root; + plg.setUseSecondaryKey(true); + plg.setSecondarySortOrder(secondarySortKeyInfo.getAscs()); + } else if (root instanceof POPackage) { + POPackage pack = (POPackage) root; + pack.getPkgr().setUseSecondaryKey(true); + } else if (root instanceof POReduceBySpark) { + POReduceBySpark reduceBySpark = (POReduceBySpark) root; + reduceBySpark.setUseSecondaryKey(true); + reduceBySpark.setSecondarySortOrder(secondarySortKeyInfo.getAscs()); + } } return secKeyOptimizerInfo; } + protected PhysicalOperator getCurrentNode(PhysicalOperator root, PhysicalPlan reducePlan) { + PhysicalOperator currentNode = null; + if (!(root instanceof POPackage)) { + log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing"); + } else { + currentNode = root; + } + return currentNode; + } + private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange rearrange, SortKeyInfo secondarySortKeyInfo) throws VisitorException { // Put plan to project secondary key to the POLocalRearrange @@ -663,5 +684,4 @@ public class SecondaryKeyOptimizerUtil { } return false; } - } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon May 29 15:00:39 2017 @@ -301,8 +301,15 @@ public class HBaseStorage extends LoadFu } columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_); - - String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER); + //In mr, UDFContext.deserialize is first called and then UDFContext.getUDFContext().getClientSystemProps() is called, + //the value is not null. + //In spark mode, when spark executor first initializes all + //the object,UDFContext.getUDFContext().getClientSystemProps() is null and then UDFContext.deserialize is called. + //so we need check whether UDFContext.getUDFContext().getClientSystemProps() + //is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is + //UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) + //Detail see PIG-4920 + String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER; String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster); if (STRING_CASTER.equalsIgnoreCase(casterOption)) { caster_ = new Utf8StorageConverter(); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Mon May 29 15:00:39 2017 @@ -172,7 +172,7 @@ public class HadoopExecutableManager ext * @return <code>true</code> if stderr data of task should be persisted on * HDFS, <code>false</code> otherwise */ - private boolean writeErrorToHDFS(int limit, String taskId) { + protected boolean writeErrorToHDFS(int limit, String taskId) { if (command.getPersistStderr() && taskId != null) { int tipId = TaskAttemptID.forName(taskId).getTaskID().getId(); return tipId < command.getLogFilesLimit(); Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original) +++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Mon May 29 15:00:39 2017 @@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; +import java.io.Serializable; + /** * Class to hold code common to self spilling bags such as InternalCachedBag */ @@ -29,6 +31,7 @@ import org.apache.pig.classification.Int @InterfaceStability.Evolving public abstract class SelfSpillBag extends DefaultAbstractBag { private static final long serialVersionUID = 1L; + // SelfSpillBag$MemoryLimits is not serializable protected MemoryLimits memLimit; public SelfSpillBag(int bagCount) { @@ -47,10 +50,11 @@ public abstract class SelfSpillBag exten * The number of objects that will fit into this memory limit is computed * using the average memory size of the objects whose size is given to this * class. + * In spark mode, MemoryLimits needs implement Serializable interface otherwise NotSerializableExecption will be thrown (See PIG-4611) */ @InterfaceAudience.Private @InterfaceStability.Evolving - public static class MemoryLimits { + public static class MemoryLimits implements Serializable { private long maxMemUsage; private long cacheLimit = Integer.MAX_VALUE; Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/PigContext.java (original) +++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon May 29 15:00:39 2017 @@ -17,6 +17,9 @@ */ package org.apache.pig.impl; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -26,6 +29,8 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import java.io.StringWriter; import java.lang.reflect.Constructor; @@ -906,5 +911,5 @@ public class PigContext implements Seria } else { classloader = new ContextClassLoader(cl); } - } + } } Modified: pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java Mon May 29 15:00:39 2017 @@ -242,8 +242,11 @@ public class StreamingUDF extends EvalFu } InputStream udfFileStream = this.getClass().getResourceAsStream( absolutePath + getUserFileExtension()); - command[PATH_TO_FILE_CACHE] = "\"" + userUdfFile.getParentFile().getAbsolutePath() - + "\""; + if (udfFileStream == null) { + //Try loading the script from other locally available jars (needed for Spark mode) + udfFileStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(filePath+getUserFileExtension()); + } + command[PATH_TO_FILE_CACHE] = "\"" + userUdfFile.getParentFile().getAbsolutePath() + "\""; try { FileUtils.copyInputStreamToFile(udfFileStream, userUdfFile); Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original) +++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Mon May 29 15:00:39 2017 @@ -534,6 +534,24 @@ public abstract class OperatorPlan<E ext connect(oper, leaf); } } + + /** + * Adds the root operator to the plan and connects + * all existing roots the new root + * + * @param root + * @throws PlanException + */ + public void addAsRoot(E root) throws PlanException { + List<E> oldRoots = new ArrayList<E>(); + for (E operator : getRoots()) { + oldRoots.add(operator); + } + add(root); + for (E oper : oldRoots) { + connect(root, oper); + } + } public boolean isSingleLeafPlan() { List<E> tmpList = getLeaves() ; Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon May 29 15:00:39 2017 @@ -23,11 +23,14 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Properties; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; public class UDFContext { + private static final Log LOG = LogFactory.getLog(UDFContext.class); private Configuration jconf = null; private HashMap<UDFContextKey, Properties> udfConfs; private Properties clientSysProps; @@ -76,7 +79,7 @@ public class UDFContext { /* * internal pig use only - should NOT be called from user code */ - HashMap<UDFContextKey, Properties> getUdfConfs() { + public HashMap<UDFContextKey, Properties> getUdfConfs() { return udfConfs; } @@ -204,6 +207,17 @@ public class UDFContext { conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps)); } + /* + * Internal pig use + */ + public String serialize() { + try { + return ObjectSerializer.serialize(udfConfs); + } catch (IOException e) { + LOG.error("UDFContext#serialize throws error ",e); + return null; + } + } /** * Populate the udfConfs field. This function is intended to @@ -218,6 +232,14 @@ public class UDFContext { jconf.get(CLIENT_SYS_PROPS)); } + public void deserializeForSpark(String udfConfsStr, String clientSysPropsStr) throws IOException { + if( udfConfsStr!= null && clientSysPropsStr!=null) { + udfConfs = (HashMap<UDFContextKey, Properties>) ObjectSerializer.deserialize(udfConfsStr); + clientSysProps = (Properties) ObjectSerializer.deserialize( + clientSysPropsStr); + } + } + private UDFContextKey generateKey(Class<?> c, String[] args) { return new UDFContextKey(c.getName(), args); } @@ -314,4 +336,8 @@ public class UDFContext { } } + public Properties getClientSysProps() { + return clientSysProps; + } + } \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Mon May 29 15:00:39 2017 @@ -26,8 +26,17 @@ import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.util.Utf8; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.executionengine.ExecException; @@ -38,6 +47,8 @@ import org.apache.pig.data.TupleFactory; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; @@ -50,12 +61,12 @@ import java.util.Map; public final class AvroTupleWrapper <T extends IndexedRecord> implements Tuple { private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class); - private TupleFactory mTupleFactory = TupleFactory.getInstance(); + private transient TupleFactory mTupleFactory = TupleFactory.getInstance(); /** * The Avro object wrapped in the pig Tuple. */ - private T avroObject; + private transient T avroObject; /** * Creates a new AvroTupleWrapper object. @@ -205,7 +216,14 @@ public final class AvroTupleWrapper <T e case NULL: break; case STRING: - total += ((String) r.get(f.pos())).length() + Object val = r.get(f.pos()); + String value; + if (val instanceof Utf8) { + value = val.toString(); + } else { + value = (String) val; + } + total += value.length() * (Character.SIZE << bitsPerByte); break; case BYTES: @@ -291,4 +309,21 @@ public final class AvroTupleWrapper <T e ); } + // Required for Java serialization used by Spark: PIG-5134 + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeObject(avroObject.getSchema().toString()); + DatumWriter<T> writer = new GenericDatumWriter<>(); + writer.setSchema(avroObject.getSchema()); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + writer.write(avroObject, encoder); + encoder.flush(); + } + + // Required for Java serialization used by Spark: PIG-5134 + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + Schema schema = new Schema.Parser().parse((String) in.readObject()); + DatumReader<T> reader = new GenericDatumReader<>(schema); + Decoder decoder = DecoderFactory.get().binaryDecoder(in, null); + avroObject = reader.read(avroObject, decoder); + } } Modified: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java (original) +++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java Mon May 29 15:00:39 2017 @@ -74,6 +74,10 @@ public class GroovyEvalFunc<T> extends E resource = ScriptEngine.class.getResource(File.separator + path); } if (resource == null) { + //Try loading the script from other locally available jars (needed for Spark mode) + resource = Thread.currentThread().getContextClassLoader().getResource(path); + } + if (resource == null) { throw new IOException("Cannot find " + path); } } else { Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon May 29 15:00:39 2017 @@ -165,7 +165,7 @@ public class PigStatsUtil { private static final String SEPARATOR = "/"; private static final String SEMICOLON = ";"; - private static String getShortName(String uri) { + public static String getShortName(String uri) { int scolon = uri.indexOf(SEMICOLON); int slash; if (scolon!=-1) { Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Mon May 29 15:00:39 2017 @@ -27,7 +27,7 @@ import org.apache.pig.classification.Int @InterfaceAudience.Public @InterfaceStability.Evolving -public class PigStatusReporter extends StatusReporter implements Progressable { +public class PigStatusReporter extends StatusReporter implements Progressable, PigWarnCounter { private static PigStatusReporter reporter = null; @@ -86,6 +86,16 @@ public class PigStatusReporter extends S } @Override + public boolean incrWarnCounter(Enum<?> name, Object incr) { + return incrCounter(name, (Long)incr); + } + + @Override + public boolean incrWarnCounter(String group, String name, Object incr) { + return incrCounter(group, name, (Long)incr); + } + + @Override public void progress() { if (context != null) { context.progress(); Added: pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats; + +/* + Interface for incrementing warning counters + */ +public interface PigWarnCounter { + + boolean incrWarnCounter(Enum<?> name, Object incr); + + boolean incrWarnCounter(String group, String name, Object incr); +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Mon May 29 15:00:39 2017 @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaSparkContext; + +public abstract class SparkCounter<T> implements Serializable { + + private String name; + private String displayName; + private Accumulator<T> accumulator; + + public SparkCounter() { + // For serialization. + } + + public SparkCounter( + String name, + String displayName, + String groupName, + T initValue, + JavaSparkContext sparkContext) { + + this.name = name; + this.displayName = displayName; + + String accumulatorName = groupName + "_" + name; + + if (sparkContext == null){ + //Spark executors can register new Accumulators but they won't make it back to the driver hence the limitation + throw new RuntimeException("Not allowed to create SparkCounter on backend executor."); + + } + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, createAccumulatorParam()); + + } + + protected abstract AccumulatorParam<T> createAccumulatorParam(); + + public T getValue() { + if (accumulator != null) { + return accumulator.value(); + } else { + return null; + } + } + + public void increment(T incr) { + accumulator.add(incr); + } + + public String getName() { + return name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + public static class LongSparkCounter extends SparkCounter<Long> { + + public LongSparkCounter(){} + + public LongSparkCounter( + String name, + String displayName, + String groupName, + Long initValue, + JavaSparkContext sparkContext){ + super(name, displayName, groupName, initValue, sparkContext); + } + + @Override + protected AccumulatorParam<Long> createAccumulatorParam() { + return new LongAccumulatorParam(); + } + + private class LongAccumulatorParam implements AccumulatorParam<Long> { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + } + + public static class MapSparkCounter extends SparkCounter<Map<String,Long>> { + + public MapSparkCounter(){} + + public MapSparkCounter( + String name, + String displayName, + String groupName, + Map<String,Long> initValue, + JavaSparkContext sparkContext){ + super(name, displayName, groupName, initValue, sparkContext); + } + + @Override + protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() { + return new MapAccumulatorParam(); + } + + private class MapAccumulatorParam implements AccumulatorParam<Map<String,Long>> { + + @Override + public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) { + return addInPlace(t1, t2); + } + + @Override + public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) { + for (String key : r2.keySet()){ + Long r1val = r1.get(key); + Long r2val = r2.get(key); + r1.put(key,r1val == null ? r2val : r1val+r2val); + } + return r1; + } + + @Override + public Map<String, Long> zero(Map<String, Long> initialValue) { + return new HashMap<>(); + } + } + } + +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java Mon May 29 15:00:39 2017 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public abstract class SparkCounterGroup<T> implements Serializable { + protected String groupName; + protected String groupDisplayName; + protected Map<String, SparkCounter<T>> sparkCounters; + + protected transient JavaSparkContext javaSparkContext; + + private SparkCounterGroup() { + // For serialization. + } + + public SparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + this.groupName = groupName; + this.groupDisplayName = groupDisplayName; + this.javaSparkContext = javaSparkContext; + this.sparkCounters = new HashMap<String, SparkCounter<T>>(); + } + + public abstract void createCounter(String name, T initValue); + + public SparkCounter getCounter(String name) { + return sparkCounters.get(name); + } + + public String getGroupName() { + return groupName; + } + + public String getGroupDisplayName() { + return groupDisplayName; + } + + public void setGroupDisplayName(String groupDisplayName) { + this.groupDisplayName = groupDisplayName; + } + + public Map<String, SparkCounter<T>> getSparkCounters() { + return sparkCounters; + } + + public static class LongSparkCounterGroup extends SparkCounterGroup<Long> { + + public LongSparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + super(groupName,groupDisplayName,javaSparkContext); + } + public void createCounter(String name, Long initValue){ + SparkCounter counter = new SparkCounter.LongSparkCounter(name, name, groupName, initValue, javaSparkContext); + sparkCounters.put(name,counter); + } + } + + public static class MapSparkCounterGroup extends SparkCounterGroup<Map<String,Long>> { + + public MapSparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + super(groupName,groupDisplayName,javaSparkContext); + } + public void createCounter(String name, Map<String,Long> initValue){ + SparkCounter counter = new SparkCounter.MapSparkCounter(name, name, groupName, initValue, javaSparkContext); + sparkCounters.put(name,counter); + } + } +} Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java (added) +++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java Mon May 29 15:00:39 2017 @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats.spark; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigWarning; +import org.apache.pig.tools.pigstats.PigWarnCounter; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class SparkCounters implements Serializable, PigWarnCounter { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SparkCounters.class); + + private Map<String, SparkCounterGroup> sparkCounterGroups; + + private final transient JavaSparkContext javaSparkContext; + + private SparkCounters() { + this(null); + } + + public SparkCounters(JavaSparkContext javaSparkContext) { + this.javaSparkContext = javaSparkContext; + this.sparkCounterGroups = new HashMap<String, SparkCounterGroup>(); + } + + public void createCounter(Enum<?> key) { + createCounter(key.getDeclaringClass().getName(), key.name()); + } + + public void createCounter(String groupName, Enum<?> key) { + createCounter(groupName, key.name(), 0L); + } + + public void createCounter(String groupName, String counterName) { + createCounter(groupName, counterName, 0L); + } + + public void createCounter(String groupName, String counterName, Object initValue) { + getGroup(groupName).createCounter(counterName, initValue); + } + + public void increment(Enum<?> key, long incrValue) { + increment(key.getDeclaringClass().getName(), key.name(), incrValue); + } + + public void increment(String groupName, String counterName, long value) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } else { + counter.increment(value); + } + } + + public Object getValue(String groupName, String counterName) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error(String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + return null; + } else { + return counter.getValue(); + } + } + + public SparkCounter getCounter(String groupName, String counterName) { + return getGroup(groupName).getCounter(counterName); + } + + public SparkCounter getCounter(Enum<?> key) { + return getCounter(key.getDeclaringClass().getName(), key.name()); + } + + private SparkCounterGroup getGroup(String groupName) { + SparkCounterGroup group = sparkCounterGroups.get(groupName); + if (group == null) { + group = new SparkCounterGroup.LongSparkCounterGroup(groupName, groupName, javaSparkContext); + sparkCounterGroups.put(groupName, group); + } + return group; + } + + public Map<String, SparkCounterGroup> getSparkCounterGroups() { + return sparkCounterGroups; + } + + + @Override + public boolean incrWarnCounter(Enum<?> name, Object incr) { + SparkCounter counter = getCounter(PigWarning.SPARK_WARN); + return _incrWarnCounter(counter, name.name(), (Long) incr); + } + + @Override + public boolean incrWarnCounter(String group, String name, Object incr) { + SparkCounter counter = getCounter(PigWarning.SPARK_CUSTOM_WARN); + return _incrWarnCounter(counter, group+"::"+name, (Long) incr); + } + + private static boolean _incrWarnCounter(SparkCounter counter, String name, Long incr) { + if (counter == null){ + return false; + } + Map<String,Long> map = new HashMap<String,Long>(); + map.put(name, incr); + counter.increment(map); + return true; + } +}