Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * An operator model for a Spark job. Acts as a host to the plans that will
+ * execute in spark.
+ */
+public class SparkOperator extends Operator<SparkOpPlanVisitor> {
+    private static enum OPER_FEATURE {
+        NONE,
+        // Indicate if this job is a sampling job
+        SAMPLER,
+        // Indicate if this job is a merge indexer
+        INDEXER,
+        // Indicate if this job is a group by job
+        GROUPBY,
+        // Indicate if this job is a cogroup job
+        COGROUP,
+        // Indicate if this job is a regular join job
+        HASHJOIN,
+        // Indicate if this job is a union job
+        UNION,
+        // Indicate if this job is a native job
+        NATIVE,
+        // Indicate if this job is a limit job
+        LIMIT,
+        // Indicate if this job is a limit job after sort
+        LIMIT_AFTER_SORT;
+    };
+
+    public PhysicalPlan physicalPlan;
+
+    public Set<String> UDFs;
+
+    /* Name of the Custom Partitioner used */
+    public String customPartitioner = null;
+
+    public Set<PhysicalOperator> scalars;
+
+    public int requestedParallelism = -1;
+
+    private BitSet feature = new BitSet();
+
+    private boolean splitter = false;
+
+    // Name of the partition file generated by sampling process,
+    // Used by Skewed Join
+    private String skewedJoinPartitionFile;
+
+    private boolean usingTypedComparator = false;
+
+    private boolean combineSmallSplits = true;
+
+    private List<String> crossKeys = null;
+
+    private MultiMap<OperatorKey, OperatorKey> multiQueryOptimizeConnectionMap 
= new MultiMap<OperatorKey, OperatorKey>();
+
+    // Indicates if a UDF comparator is used
+    boolean isUDFComparatorUsed = false;
+
+    //The quantiles file name if globalSort is true
+    private String quantFile;
+
+    //Indicates if this job is an order by job
+    private boolean globalSort = false;
+
+    public SparkOperator(OperatorKey k) {
+        super(k);
+        physicalPlan = new PhysicalPlan();
+        UDFs = new HashSet<String>();
+        scalars = new HashSet<PhysicalOperator>();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return true;
+    }
+
+    @Override
+    public String name() {
+        String udfStr = getUDFsAsStr();
+        StringBuilder sb = new StringBuilder("Spark" + "("
+                + requestedParallelism + (udfStr.equals("") ? "" : ",")
+                + udfStr + ")" + " - " + mKey.toString());
+        return sb.toString();
+    }
+
+    private String getUDFsAsStr() {
+        StringBuilder sb = new StringBuilder();
+        if (UDFs != null && UDFs.size() > 0) {
+            for (String str : UDFs) {
+                sb.append(str.substring(str.lastIndexOf('.') + 1));
+                sb.append(',');
+            }
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
+    }
+
+    public void add(PhysicalOperator physicalOper) {
+        this.physicalPlan.add(physicalOper);
+    }
+
+    @Override
+    public void visit(SparkOpPlanVisitor v) throws VisitorException {
+        v.visitSparkOp(this);
+    }
+
+    public void addCrossKey(String key) {
+        if (crossKeys == null) {
+            crossKeys = new ArrayList<String>();
+        }
+        crossKeys.add(key);
+    }
+
+    public List<String> getCrossKeys() {
+        return crossKeys;
+    }
+
+    public boolean isGroupBy() {
+        return feature.get(OPER_FEATURE.GROUPBY.ordinal());
+    }
+
+    public void markGroupBy() {
+        feature.set(OPER_FEATURE.GROUPBY.ordinal());
+    }
+
+    public boolean isCogroup() {
+        return feature.get(OPER_FEATURE.COGROUP.ordinal());
+    }
+
+    public void markCogroup() {
+        feature.set(OPER_FEATURE.COGROUP.ordinal());
+    }
+
+    public boolean isRegularJoin() {
+        return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
+    }
+
+    public void markRegularJoin() {
+        feature.set(OPER_FEATURE.HASHJOIN.ordinal());
+    }
+
+    public int getRequestedParallelism() {
+        return requestedParallelism;
+    }
+
+    public void setSplitter(boolean spl) {
+        splitter = spl;
+    }
+
+    public boolean isSplitter() {
+        return splitter;
+    }
+
+    public boolean isSampler() {
+        return feature.get(OPER_FEATURE.SAMPLER.ordinal());
+    }
+
+    public void markSampler() {
+        feature.set(OPER_FEATURE.SAMPLER.ordinal());
+    }
+
+    public void setSkewedJoinPartitionFile(String file) {
+        skewedJoinPartitionFile = file;
+    }
+
+    public String getSkewedJoinPartitionFile() {
+        return skewedJoinPartitionFile;
+    }
+
+    protected boolean usingTypedComparator() {
+        return usingTypedComparator;
+    }
+
+    protected void useTypedComparator(boolean useTypedComparator) {
+        this.usingTypedComparator = useTypedComparator;
+    }
+
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
+
+    public boolean isIndexer() {
+        return feature.get(OPER_FEATURE.INDEXER.ordinal());
+    }
+
+    public void markIndexer() {
+        feature.set(OPER_FEATURE.INDEXER.ordinal());
+    }
+    public boolean isUnion() {
+        return feature.get(OPER_FEATURE.UNION.ordinal());
+    }
+
+    public void markUnion() {
+        feature.set(OPER_FEATURE.UNION.ordinal());
+    }
+
+    public boolean isNative() {
+        return feature.get(OPER_FEATURE.NATIVE.ordinal());
+    }
+
+    public void markNative() {
+        feature.set(OPER_FEATURE.NATIVE.ordinal());
+    }
+
+    public boolean isLimit() {
+        return feature.get(OPER_FEATURE.LIMIT.ordinal());
+    }
+
+    public void markLimit() {
+        feature.set(OPER_FEATURE.LIMIT.ordinal());
+    }
+
+    public boolean isLimitAfterSort() {
+        return feature.get(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+    }
+
+    public void markLimitAfterSort() {
+        feature.set(OPER_FEATURE.LIMIT_AFTER_SORT.ordinal());
+    }
+
+    public void copyFeatures(SparkOperator copyFrom, List<OPER_FEATURE> 
excludeFeatures) {
+        for (OPER_FEATURE opf : OPER_FEATURE.values()) {
+            if (excludeFeatures != null && excludeFeatures.contains(opf)) {
+                continue;
+            }
+            if (copyFrom.feature.get(opf.ordinal())) {
+                feature.set(opf.ordinal());
+            }
+        }
+    }
+
+    public boolean isSkewedJoin() {
+        return (skewedJoinPartitionFile != null);
+    }
+
+    public void setRequestedParallelism(int requestedParallelism) {
+        this.requestedParallelism = requestedParallelism;
+    }
+
+    public void setRequestedParallelismByReference(SparkOperator oper) {
+        this.requestedParallelism = oper.requestedParallelism;
+    }
+
+    //If enable multiquery optimizer, in some cases, the predecessor(from) of 
a physicalOp(to) will be the leaf physicalOperator of
+    //previous sparkOperator.More detail see PIG-4675
+    public void addMultiQueryOptimizeConnectionItem(OperatorKey to, 
OperatorKey from) {
+        multiQueryOptimizeConnectionMap.put(to, from);
+    }
+
+    public MultiMap<OperatorKey, OperatorKey> 
getMultiQueryOptimizeConnectionItem() {
+        return multiQueryOptimizeConnectionMap;
+    }
+
+    public void setGlobalSort(boolean globalSort) {
+        this.globalSort = globalSort;
+    }
+
+    public boolean isGlobalSort() {
+        return globalSort;
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This visitor visits the SparkPlan and does the following for each
+ * SparkOperator - visits the POPackage in the plan and finds the corresponding
+ * POLocalRearrange(s). It then annotates the POPackage with information about
+ * which columns in the "value" are present in the "key" and will need to
+ * stitched in to the "value"
+ */
+public class SparkPOPackageAnnotator extends SparkOpPlanVisitor {
+    private static final Log LOG = 
LogFactory.getLog(SparkPOPackageAnnotator.class);
+
+    public SparkPOPackageAnnotator(SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        if (!sparkOp.physicalPlan.isEmpty()) {
+            PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(
+                    sparkOp.physicalPlan);
+            pkgDiscoverer.visit();
+        }
+    }
+
+    static class PackageDiscoverer extends PhyPlanVisitor {
+        private POPackage pkg;
+        private PhysicalPlan plan;
+
+        public PackageDiscoverer(PhysicalPlan plan) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                    plan));
+            this.plan = plan;
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            this.pkg = pkg;
+
+            // Find POLocalRearrange(s) corresponding to this POPackage
+            PhysicalOperator graOp = plan.getPredecessors(pkg).get(0);
+            if (! (graOp instanceof POGlobalRearrange)) {
+                  throw new OptimizerException("Package operator is not 
preceded by " +
+                        "GlobalRearrange operator in Spark Plan", 2087, 
PigException.BUG);
+            }
+
+            List<PhysicalOperator> lraOps = plan.getPredecessors(graOp);
+            if (pkg.getNumInps() != lraOps.size()) {
+          throw new OptimizerException("Unexpected problem during 
optimization. " +
+                            "Could not find all LocalRearrange operators. 
Expected " + pkg.getNumInps() +
+                        ". Got " + lraOps.size() + ".", 2086, 
PigException.BUG);
+            }
+            Collections.sort(lraOps);
+            for (PhysicalOperator op : lraOps) {
+                if (! (op instanceof POLocalRearrange)) {
+                    throw new OptimizerException("GlobalRearrange operator can 
only be preceded by " +
+                            "LocalRearrange operator(s) in Spark Plan", 2087, 
PigException.BUG);
+                }
+                annotatePkgWithLRA((POLocalRearrange)op);
+            }
+        };
+
+        private void annotatePkgWithLRA(POLocalRearrange lrearrange)
+                throws VisitorException {
+
+            Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+            if (LOG.isDebugEnabled())
+                 LOG.debug("Annotating package " + pkg + " with localrearrange 
operator "
+               + lrearrange + " with index " + lrearrange.getIndex());
+
+            if (pkg.getPkgr() instanceof LitePackager) {
+                if (lrearrange.getIndex() != 0) {
+                    throw new VisitorException(
+                            "POLocalRearrange for POPackageLite cannot have 
index other than 0, but has index - "
+                                    + lrearrange.getIndex());
+                }
+            }
+
+            // annotate the package with information from the LORearrange
+            // update the keyInfo information if already present in the
+            // POPackage
+            keyInfo = pkg.getPkgr().getKeyInfo();
+            if (keyInfo == null)
+                keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, 
Integer>>>();
+
+            if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
+                // something is wrong - we should not be getting key info
+                // for the same index from two different Local Rearranges
+                int errCode = 2087;
+                String msg = "Unexpected problem during optimization."
+                        + " Found index:" + lrearrange.getIndex()
+                        + " in multiple LocalRearrange operators.";
+                throw new OptimizerException(msg, errCode, PigException.BUG);
+
+            }
+            keyInfo.put(
+                    Integer.valueOf(lrearrange.getIndex()),
+                    new Pair<Boolean, Map<Integer, Integer>>(lrearrange
+                            .isProjectStar(), 
lrearrange.getProjectedColsMap()));
+            if (LOG.isDebugEnabled())
+          LOG.debug("KeyInfo for packager for package operator " + pkg + " is "
+              + keyInfo );
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+        }
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism printing out the logical plan.
+ */
+public class SparkPrinter extends SparkOpPlanVisitor {
+
+    private PrintStream mStream = null;
+    private boolean isVerbose = true;
+
+    public SparkPrinter(PrintStream ps, SparkOperPlan plan) {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+        mStream = ps;
+        mStream.println("#--------------------------------------------------");
+        mStream.println("# Spark Plan");
+        mStream.println("#--------------------------------------------------");
+    }
+
+    public void setVerbose(boolean verbose) {
+        isVerbose = verbose;
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
+        mStream.println("");
+        mStream.println("Spark node " + sparkOp.getOperatorKey().toString());
+        if (sparkOp instanceof NativeSparkOperator) {
+            mStream.println(((NativeSparkOperator)sparkOp).getCommandString());
+            mStream.println("--------");
+            mStream.println();
+            return;
+        }
+        if (sparkOp.physicalPlan != null && sparkOp.physicalPlan.size() > 0) {
+            PlanPrinter<PhysicalOperator, PhysicalPlan> printer = new 
PlanPrinter<PhysicalOperator, PhysicalPlan>(
+                    sparkOp.physicalPlan, mStream);
+            printer.setVerbose(isVerbose);
+            printer.visit();
+            mStream.println("--------");
+        }
+        List<POGlobalRearrangeSpark> glrList = 
PlanHelper.getPhysicalOperators(sparkOp.physicalPlan, 
POGlobalRearrangeSpark.class);
+        for (POGlobalRearrangeSpark glr : glrList) {
+            if (glr.isUseSecondaryKey()) {
+                mStream.println("POGlobalRearrange(" + glr.getOperatorKey() + 
") uses secondaryKey");
+            }
+        }
+    }
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.plan;
+
+import java.io.PrintStream;
+import java.io.StringWriter;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.XMLPhysicalPlanPrinter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import static javax.xml.transform.OutputKeys.INDENT;
+import static javax.xml.transform.OutputKeys.OMIT_XML_DECLARATION;
+
+
+public class XMLSparkPrinter extends SparkOpPlanVisitor {
+
+    private PrintStream mStream = null;
+
+    private Document doc = null;
+    private Element root = null;
+
+    public XMLSparkPrinter(PrintStream ps, SparkOperPlan plan) throws 
ParserConfigurationException {
+        super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+        mStream = ps;
+        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        DocumentBuilder builder = factory.newDocumentBuilder();
+        this.doc = builder.newDocument();
+        this.root = this.doc.createElement("sparkPlan");
+        this.doc.appendChild(this.root);
+
+    }
+
+
+    public void closePlan() throws TransformerException {
+        TransformerFactory factory = TransformerFactory.newInstance();
+        Transformer transformer = factory.newTransformer();
+        transformer.setOutputProperty(OMIT_XML_DECLARATION, "yes");
+        transformer.setOutputProperty(INDENT, "yes");
+        
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount";, "2");
+
+        StringWriter sw = new StringWriter();
+        StreamResult result = new StreamResult(sw);
+        DOMSource source = new DOMSource(doc);
+        transformer.transform(source, result);
+        mStream.println(sw.toString());
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator so) throws VisitorException {
+        Element sparkNode = doc.createElement("sparkNode");
+        sparkNode.setAttribute("scope", "" + so.getOperatorKey().id);
+        if(so instanceof NativeSparkOperator) {
+            Element nativeSparkOper = doc.createElement("nativeSpark");
+            
nativeSparkOper.setTextContent(((NativeSparkOperator)so).getCommandString());
+            sparkNode.appendChild(nativeSparkOper);
+            root.appendChild(sparkNode);
+            return;
+        }
+
+        if (so.physicalPlan != null && so.physicalPlan.size() > 0) {
+            XMLPhysicalPlanPrinter<PhysicalPlan> printer = new 
XMLPhysicalPlanPrinter<>(so.physicalPlan, doc, sparkNode);
+            printer.visit();
+        }
+
+        root.appendChild(sparkNode);
+
+    }
+
+}

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.running;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.SparkPigRecordReader;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigSplit;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+
+public class PigInputFormatSpark extends PigInputFormat {
+
+    @Override
+    public RecordReader<Text, Tuple> createRecordReader(InputSplit split, 
TaskAttemptContext context) throws
+            IOException, InterruptedException {
+        resetUDFContext();
+        //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
+        //from context.getConfigration() to retrieve pig properties
+        PigSplit pigSplit = ((SparkPigSplit) split).getWrappedPigSplit();
+        Configuration conf = context.getConfiguration();
+        pigSplit.setConf(conf);
+        //Set current splitIndex in 
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
+        //which will be used in POMergeCogroup#setup
+        if (PigMapReduce.sJobContext == null) {
+            PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
JobID());
+        }
+        
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
 pigSplit.getSplitIndex());
+        // Here JobConf is first available in spark Executor thread, we 
initialize PigContext,UDFContext and
+        // SchemaTupleBackend by reading properties from JobConf
+        initialize(conf);
+
+        SparkRecordReaderFactory sparkRecordReaderFactory = new 
SparkRecordReaderFactory(pigSplit, context);
+        return sparkRecordReaderFactory.createRecordReader();
+    }
+
+    /**
+     * This is where we have to wrap PigSplits into SparkPigSplits
+     * @param jobcontext
+     * @return
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext jobcontext) throws 
IOException, InterruptedException {
+        List<InputSplit> sparkPigSplits = new ArrayList<>();
+        List<InputSplit> originalSplits = super.getSplits(jobcontext);
+
+        boolean isFileSplits = true;
+        for (InputSplit inputSplit : originalSplits) {
+            PigSplit split = (PigSplit)inputSplit;
+            if (!(split.getWrappedSplit() instanceof FileSplit)) {
+                isFileSplits = false;
+                break;
+            }
+        }
+
+        for (InputSplit inputSplit : originalSplits) {
+            PigSplit split = (PigSplit) inputSplit;
+            if (!isFileSplits) {
+                sparkPigSplits.add(new 
SparkPigSplit.GenericSparkPigSplit(split));
+            } else {
+                sparkPigSplits.add(new SparkPigSplit.FileSparkPigSplit(split));
+            }
+        }
+
+        return sparkPigSplits;
+    }
+
+    private void initialize(Configuration jobConf) throws IOException {
+        MapRedUtil.setupUDFContext(jobConf);
+        PigContext pc = (PigContext) 
ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+        SchemaTupleBackend.initialize(jobConf, pc);
+        PigMapReduce.sJobConfInternal.set(jobConf);
+        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+        
pigHadoopLogger.setAggregate("true".equalsIgnoreCase(jobConf.get("aggregate.warning")));
+        
pigHadoopLogger.setReporter((SparkCounters)ObjectSerializer.deserialize(jobConf.get("pig.spark.counters")));
+        PhysicalOperator.setPigLogger(pigHadoopLogger);
+    }
+
+    private void resetUDFContext() {
+               UDFContext.getUDFContext().reset();
+       }
+
+
+    static class SparkRecordReaderFactory extends 
PigInputFormat.RecordReaderFactory {
+
+        public SparkRecordReaderFactory(InputSplit split, TaskAttemptContext 
context) throws IOException {
+            super(split, context);
+        }
+
+        @Override
+        public RecordReader<Text, Tuple> createRecordReader() throws 
IOException, InterruptedException {
+            return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc, 
context, limit);
+        }
+    }
+}
\ No newline at end of file

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java?rev=1796639&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
 Mon May 29 15:00:39 2017
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.streaming;
+
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
+import org.apache.spark.TaskContext;
+
+public class SparkExecutableManager extends HadoopExecutableManager {
+    @Override
+    protected boolean writeErrorToHDFS(int limit, String taskId) {
+        if (command.getPersistStderr()) {
+            int tipId = TaskContext.get().attemptNumber();
+            return tipId < command.getLogFilesLimit();
+        }
+        return false;
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/AccumulatorOptimizer.java
 Mon May 29 15:00:39 2017
@@ -37,6 +37,6 @@ public class AccumulatorOptimizer extend
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        AccumulatorOptimizerUtil.addAccumulator(tezOp.plan);
+        AccumulatorOptimizerUtil.addAccumulator(tezOp.plan, 
tezOp.plan.getRoots());
     }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 Mon May 29 15:00:39 2017
@@ -102,7 +102,8 @@ public class SecondaryKeyOptimizerTez ex
             rearrangePlan = 
PlanHelper.getLocalRearrangePlanFromSplit(from.plan, 
connectingLR.getOperatorKey());
         }
 
-        SecondaryKeyOptimizerInfo info = 
SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan);
+        SecondaryKeyOptimizerUtil secondaryKeyOptUtil = new 
SecondaryKeyOptimizerUtil();
+        SecondaryKeyOptimizerInfo info = 
secondaryKeyOptUtil.applySecondaryKeySort(rearrangePlan, to.plan);
         if (info != null) {
             numSortRemoved += info.getNumSortRemoved();
             numDistinctChanged += info.getNumDistinctChanged();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
 Mon May 29 15:00:39 2017
@@ -37,11 +37,15 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
 
 public class AccumulatorOptimizerUtil {
     private static final Log LOG = 
LogFactory.getLog(AccumulatorOptimizerUtil.class);
@@ -57,9 +61,8 @@ public class AccumulatorOptimizerUtil {
         return batchSize;
     }
 
-    public static void addAccumulator(PhysicalPlan plan) {
+    public static void addAccumulator(PhysicalPlan plan, 
List<PhysicalOperator> pos) {
         // See if this is a map-reduce job
-        List<PhysicalOperator> pos = plan.getRoots();
         if (pos == null || pos.size() == 0) {
             return;
         }
@@ -286,4 +289,4 @@ public class AccumulatorOptimizerUtil {
 
         return false;
     }
-}
+}
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 Mon May 29 15:00:39 2017
@@ -333,7 +333,7 @@ public class CombinerOptimizerUtil {
      * @return null if plan is not combinable, otherwise list of combinable 
operators
      * @throws VisitorException
      */
-    private static List<Pair<PhysicalOperator, PhysicalPlan>> 
findAlgebraicOps(List<PhysicalPlan> feInners)
+    public static List<Pair<PhysicalOperator, PhysicalPlan>> 
findAlgebraicOps(List<PhysicalPlan> feInners)
             throws VisitorException {
         List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = 
Lists.newArrayList();
 
@@ -447,7 +447,7 @@ public class CombinerOptimizerUtil {
      * @param keyType type for group-by key
      * @return new POForeach
      */
-    private static POForEach createForEachWithGrpProj(POForEach foreach, byte 
keyType) {
+    public static POForEach createForEachWithGrpProj(POForEach foreach, byte 
keyType) {
         String scope = foreach.getOperatorKey().scope;
         POForEach newFE = new POForEach(createOperatorKey(scope), new 
ArrayList<PhysicalPlan>());
         newFE.addOriginalLocation(foreach.getAlias(), 
foreach.getOriginalLocations());
@@ -471,7 +471,7 @@ public class CombinerOptimizerUtil {
      * @throws CloneNotSupportedException
      * @throws PlanException
      */
-    private static PhysicalPlan createPlanWithPredecessors(PhysicalOperator 
algeOp, PhysicalPlan pplan)
+    public static PhysicalPlan createPlanWithPredecessors(PhysicalOperator 
algeOp, PhysicalPlan pplan)
             throws CloneNotSupportedException, PlanException {
         PhysicalPlan newplan = new PhysicalPlan();
         addPredecessorsToPlan(algeOp, pplan, newplan);
@@ -508,7 +508,7 @@ public class CombinerOptimizerUtil {
      * @throws CloneNotSupportedException
      * @throws PlanException
      */
-    private static void addAlgebraicFuncToCombineFE(POForEach cfe, 
Map<PhysicalOperator, Integer> op2newpos)
+    public static void addAlgebraicFuncToCombineFE(POForEach cfe, 
Map<PhysicalOperator, Integer> op2newpos)
             throws CloneNotSupportedException, PlanException {
         // an array that we will first populate with physical operators in 
order
         // of their position in input. Used while adding plans to combine
@@ -578,7 +578,7 @@ public class CombinerOptimizerUtil {
      * @param rearrange
      * @return
      */
-    private static POPreCombinerLocalRearrange 
getPreCombinerLR(POLocalRearrange rearrange) {
+    public static POPreCombinerLocalRearrange 
getPreCombinerLR(POLocalRearrange rearrange) {
         String scope = rearrange.getOperatorKey().scope;
         POPreCombinerLocalRearrange pclr = new POPreCombinerLocalRearrange(
                 createOperatorKey(scope),
@@ -587,7 +587,7 @@ public class CombinerOptimizerUtil {
         return pclr;
     }
 
-    private static OperatorKey createOperatorKey(String scope) {
+    public static OperatorKey createOperatorKey(String scope) {
         return new OperatorKey(scope, 
NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
 
@@ -619,7 +619,7 @@ public class CombinerOptimizerUtil {
      * @param type
      * @throws PlanException
      */
-    private static void changeFunc(POForEach fe, byte type) throws 
PlanException {
+    public static void changeFunc(POForEach fe, byte type) throws 
PlanException {
         for (PhysicalPlan plan : fe.getInputPlans()) {
             List<PhysicalOperator> leaves = plan.getLeaves();
             if (leaves == null || leaves.size() != 1) {
@@ -657,7 +657,7 @@ public class CombinerOptimizerUtil {
      * @throws PlanException
      * @throws CloneNotSupportedException
      */
-    private static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
+    public static POLocalRearrange getNewRearrange(POLocalRearrange rearrange)
             throws PlanException, CloneNotSupportedException {
         POLocalRearrange newRearrange = rearrange.clone();
 
@@ -835,7 +835,7 @@ public class CombinerOptimizerUtil {
      * with
      * POUserFunc(org.apache.pig.builtin.Distinct)[DataBag]
      */
-    private static class DistinctPatcher extends PhyPlanVisitor {
+    public static class DistinctPatcher extends PhyPlanVisitor {
         private POUserFunc distinct = null;
         /**
          * @param plan
@@ -901,12 +901,12 @@ public class CombinerOptimizerUtil {
             }
         }
 
-        POUserFunc getDistinct() {
+        public POUserFunc getDistinct() {
             return distinct;
         }
     }
 
-    private static class fixMapProjects extends PhyPlanVisitor {
+    public static class fixMapProjects extends PhyPlanVisitor {
         public fixMapProjects(PhysicalPlan plan) {
             this(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
         }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
 Mon May 29 15:00:39 2017
@@ -35,12 +35,15 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -54,7 +57,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class SecondaryKeyOptimizerUtil {
     private static Log log = 
LogFactory.getLog(SecondaryKeyOptimizerUtil.class.getName());
 
-    private SecondaryKeyOptimizerUtil() {
+    public SecondaryKeyOptimizerUtil() {
 
     }
 
@@ -182,7 +185,7 @@ public class SecondaryKeyOptimizerUtil {
         return result;
     }
 
-    public static SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan 
mapPlan, PhysicalPlan reducePlan) throws VisitorException {
+    public SecondaryKeyOptimizerInfo applySecondaryKeySort(PhysicalPlan 
mapPlan, PhysicalPlan reducePlan) throws VisitorException {
         log.trace("Entering SecondaryKeyOptimizerUtil.addSecondaryKeySort");
         SecondaryKeyOptimizerInfo secKeyOptimizerInfo = new 
SecondaryKeyOptimizerInfo();
         List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
@@ -241,14 +244,11 @@ public class SecondaryKeyOptimizerUtil {
         }
 
         PhysicalOperator root = reduceRoots.get(0);
-        if (!(root instanceof POPackage)) {
-            log.debug("Expected reduce root to be a POPackage, skip secondary 
key optimizing");
-            return null;
-        }
+        PhysicalOperator currentNode = getCurrentNode(root,reducePlan);
 
         // visit the POForEach of the reduce plan. We can have Limit and Filter
         // in the middle
-        PhysicalOperator currentNode = root;
+
         POForEach foreach = null;
         while (currentNode != null) {
             if (currentNode instanceof POPackage
@@ -402,12 +402,33 @@ public class SecondaryKeyOptimizerUtil {
                     throw new VisitorException("Cannot find POLocalRearrange 
to set secondary plan", errorCode);
                 }
             }
-            POPackage pack = (POPackage) root;
-            pack.getPkgr().setUseSecondaryKey(true);
+
+            if (root instanceof POGlobalRearrangeSpark) {
+                POGlobalRearrangeSpark plg = (POGlobalRearrangeSpark) root;
+                plg.setUseSecondaryKey(true);
+                plg.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+            } else if (root instanceof POPackage) {
+                POPackage pack = (POPackage) root;
+                pack.getPkgr().setUseSecondaryKey(true);
+            } else if (root instanceof POReduceBySpark) {
+                POReduceBySpark reduceBySpark = (POReduceBySpark) root;
+                reduceBySpark.setUseSecondaryKey(true);
+                
reduceBySpark.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
+            }
         }
         return secKeyOptimizerInfo;
     }
 
+    protected PhysicalOperator getCurrentNode(PhysicalOperator root, 
PhysicalPlan reducePlan) {
+        PhysicalOperator currentNode = null;
+        if (!(root instanceof POPackage)) {
+            log.debug("Expected reduce root to be a POPackage, skip secondary 
key optimizing");
+        } else {
+            currentNode = root;
+        }
+        return currentNode;
+    }
+
     private static void setSecondaryPlan(PhysicalPlan plan, POLocalRearrange 
rearrange,
             SortKeyInfo secondarySortKeyInfo) throws VisitorException {
         // Put plan to project secondary key to the POLocalRearrange
@@ -663,5 +684,4 @@ public class SecondaryKeyOptimizerUtil {
         }
         return false;
     }
-
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon May 
29 15:00:39 2017
@@ -301,8 +301,15 @@ public class HBaseStorage extends LoadFu
         }
 
         columnInfo_ = parseColumnList(columnList, delimiter_, 
ignoreWhitespace_);
-
-        String defaultCaster = 
UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, 
STRING_CASTER);
+        //In mr,  UDFContext.deserialize is first called and then 
UDFContext.getUDFContext().getClientSystemProps() is called,
+        //the value is not null.
+        //In spark mode, when spark executor first initializes all
+        //the object,UDFContext.getUDFContext().getClientSystemProps() is null 
and then UDFContext.deserialize is called.
+        //so we need check whether 
UDFContext.getUDFContext().getClientSystemProps()
+        //is null or not, if is null, defaultCaster =STRING_CASTER, otherwise 
is
+        
//UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY,
 STRING_CASTER)
+        //Detail see PIG-4920
+        String defaultCaster = 
UDFContext.getUDFContext().getClientSystemProps() != null ? 
UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, 
STRING_CASTER) : STRING_CASTER;
         String casterOption = configuredOptions_.getOptionValue("caster", 
defaultCaster);
         if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
             caster_ = new Utf8StorageConverter();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
 Mon May 29 15:00:39 2017
@@ -172,7 +172,7 @@ public class HadoopExecutableManager ext
      * @return <code>true</code> if stderr data of task should be persisted on 
      *         HDFS, <code>false</code> otherwise
      */
-    private boolean writeErrorToHDFS(int limit, String taskId) {
+    protected boolean writeErrorToHDFS(int limit, String taskId) {
         if (command.getPersistStderr() && taskId != null) {
             int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();

Modified: pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/SelfSpillBag.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/SelfSpillBag.java Mon May 29 15:00:39 2017
@@ -22,6 +22,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 
+import java.io.Serializable;
+
 /**
  * Class to hold code common to self spilling bags such as InternalCachedBag
  */
@@ -29,6 +31,7 @@ import org.apache.pig.classification.Int
 @InterfaceStability.Evolving
 public abstract class SelfSpillBag extends DefaultAbstractBag {
     private static final long serialVersionUID = 1L;
+    // SelfSpillBag$MemoryLimits is not serializable
     protected MemoryLimits memLimit;
 
     public SelfSpillBag(int bagCount) {
@@ -47,10 +50,11 @@ public abstract class SelfSpillBag exten
      * The number of objects that will fit into this memory limit is computed
      * using the average memory size of the objects whose size is given to this
      * class.
+     * In spark mode, MemoryLimits needs implement Serializable interface 
otherwise NotSerializableExecption will be thrown (See PIG-4611)
      */
     @InterfaceAudience.Private
     @InterfaceStability.Evolving
-    public static class MemoryLimits {
+    public static class MemoryLimits implements Serializable {
 
         private long maxMemUsage;
         private long cacheLimit = Integer.MAX_VALUE;

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon May 29 15:00:39 2017
@@ -17,6 +17,9 @@
  */
 package org.apache.pig.impl;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -26,6 +29,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.lang.reflect.Constructor;
@@ -906,5 +911,5 @@ public class PigContext implements Seria
         } else {
             classloader = new ContextClassLoader(cl);
         }
-    }
+   }
 }

Modified: pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/StreamingUDF.java Mon May 29 
15:00:39 2017
@@ -242,8 +242,11 @@ public class StreamingUDF extends EvalFu
             }
             InputStream udfFileStream = this.getClass().getResourceAsStream(
                     absolutePath + getUserFileExtension());
-            command[PATH_TO_FILE_CACHE] = "\"" + 
userUdfFile.getParentFile().getAbsolutePath()
-                    + "\"";
+            if (udfFileStream == null) {
+                //Try loading the script from other locally available jars 
(needed for Spark mode)
+                udfFileStream = 
Thread.currentThread().getContextClassLoader().getResourceAsStream(filePath+getUserFileExtension());
+            }
+            command[PATH_TO_FILE_CACHE] = "\"" + 
userUdfFile.getParentFile().getAbsolutePath() + "\"";
 
             try {
                 FileUtils.copyInputStreamToFile(udfFileStream, userUdfFile);

Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Mon May 29 
15:00:39 2017
@@ -534,6 +534,24 @@ public abstract class OperatorPlan<E ext
             connect(oper, leaf);
         }
     }
+
+    /**
+     * Adds the root operator to the plan and connects
+     * all existing roots the new root
+     *
+     * @param root
+     * @throws PlanException
+     */
+    public void addAsRoot(E root) throws PlanException {
+        List<E> oldRoots = new ArrayList<E>();
+        for (E operator : getRoots()) {
+            oldRoots.add(operator);
+        }
+        add(root);
+        for (E oper : oldRoots) {
+            connect(root, oper);
+        }
+    }
     
     public boolean isSingleLeafPlan() {
         List<E> tmpList = getLeaves() ;

Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon May 29 15:00:39 
2017
@@ -23,11 +23,14 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 
 public class UDFContext {
 
+    private static final Log LOG = LogFactory.getLog(UDFContext.class);
     private Configuration jconf = null;
     private HashMap<UDFContextKey, Properties> udfConfs;
     private Properties clientSysProps;
@@ -76,7 +79,7 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
-    HashMap<UDFContextKey, Properties> getUdfConfs() {
+    public HashMap<UDFContextKey, Properties> getUdfConfs() {
         return udfConfs;
     }
 
@@ -204,6 +207,17 @@ public class UDFContext {
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
 
+    /*
+     * Internal pig use
+     */
+    public String serialize() {
+        try {
+            return ObjectSerializer.serialize(udfConfs);
+        } catch (IOException e) {
+            LOG.error("UDFContext#serialize throws error ",e);
+            return null;
+        }
+    }
 
     /**
      * Populate the udfConfs field.  This function is intended to
@@ -218,6 +232,14 @@ public class UDFContext {
                 jconf.get(CLIENT_SYS_PROPS));
     }
 
+    public void deserializeForSpark(String udfConfsStr, String 
clientSysPropsStr) throws IOException {
+        if( udfConfsStr!= null && clientSysPropsStr!=null) {
+            udfConfs = (HashMap<UDFContextKey, Properties>) 
ObjectSerializer.deserialize(udfConfsStr);
+            clientSysProps = (Properties) ObjectSerializer.deserialize(
+                    clientSysPropsStr);
+        }
+    }
+
     private UDFContextKey generateKey(Class<?> c, String[] args) {
         return new UDFContextKey(c.getName(), args);
     }
@@ -314,4 +336,8 @@ public class UDFContext {
         }
     }
 
+    public Properties getClientSysProps() {
+        return clientSysProps;
+    }
+
 }
\ No newline at end of file

Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Mon May 
29 15:00:39 2017
@@ -26,8 +26,17 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -38,6 +47,8 @@ import org.apache.pig.data.TupleFactory;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
@@ -50,12 +61,12 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
-    private TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private transient TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
    */
-  private T avroObject;
+  private transient T avroObject;
 
   /**
    * Creates a new AvroTupleWrapper object.
@@ -205,7 +216,14 @@ public final class AvroTupleWrapper <T e
       case NULL:
         break;
       case STRING:
-        total += ((String) r.get(f.pos())).length()
+        Object val = r.get(f.pos());
+        String value;
+        if (val instanceof Utf8) {
+          value = val.toString();
+        } else {
+          value = (String) val;
+        }
+        total += value.length()
            * (Character.SIZE << bitsPerByte);
         break;
       case BYTES:
@@ -291,4 +309,21 @@ public final class AvroTupleWrapper <T e
         );
   }
 
+  // Required for Java serialization used by Spark: PIG-5134
+  private void writeObject(ObjectOutputStream out) throws IOException {
+    out.writeObject(avroObject.getSchema().toString());
+    DatumWriter<T> writer = new GenericDatumWriter<>();
+    writer.setSchema(avroObject.getSchema());
+    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    writer.write(avroObject, encoder);
+    encoder.flush();
+  }
+
+  // Required for Java serialization used by Spark: PIG-5134
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    Schema schema = new Schema.Parser().parse((String) in.readObject());
+    DatumReader<T> reader = new GenericDatumReader<>(schema);
+    Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+    avroObject = reader.read(avroObject, decoder);
+  }
 }

Modified: pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java (original)
+++ pig/trunk/src/org/apache/pig/scripting/groovy/GroovyEvalFunc.java Mon May 
29 15:00:39 2017
@@ -74,6 +74,10 @@ public class GroovyEvalFunc<T> extends E
               resource = ScriptEngine.class.getResource(File.separator + path);
           }
           if (resource == null) {
+            //Try loading the script from other locally available jars (needed 
for Spark mode)
+              resource = 
Thread.currentThread().getContextClassLoader().getResource(path);
+          }
+          if (resource == null) {
               throw new IOException("Cannot find " + path);
           }
       } else {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon May 29 
15:00:39 2017
@@ -165,7 +165,7 @@ public class PigStatsUtil {
     private static final String SEPARATOR = "/";
     private static final String SEMICOLON = ";";
 
-    private static String getShortName(String uri) {
+    public static String getShortName(String uri) {
         int scolon = uri.indexOf(SEMICOLON);
         int slash;
         if (scolon!=-1) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1796639&r1=1796638&r2=1796639&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Mon May 
29 15:00:39 2017
@@ -27,7 +27,7 @@ import org.apache.pig.classification.Int
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class PigStatusReporter extends StatusReporter implements Progressable {
+public class PigStatusReporter extends StatusReporter implements Progressable, 
PigWarnCounter {
 
     private static PigStatusReporter reporter = null;
 
@@ -86,6 +86,16 @@ public class PigStatusReporter extends S
     }
 
     @Override
+    public boolean incrWarnCounter(Enum<?> name, Object incr) {
+        return incrCounter(name, (Long)incr);
+    }
+
+    @Override
+    public boolean incrWarnCounter(String group, String name, Object incr) {
+        return incrCounter(group, name, (Long)incr);
+    }
+
+    @Override
     public void progress() {
         if (context != null) {
             context.progress();

Added: pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigWarnCounter.java Mon May 29 
15:00:39 2017
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats;
+
+/*
+    Interface for incrementing warning counters
+ */
+public interface PigWarnCounter {
+
+    boolean incrWarnCounter(Enum<?> name, Object incr);
+
+    boolean incrWarnCounter(String group, String name, Object incr);
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounter.java Mon May 
29 15:00:39 2017
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public abstract class SparkCounter<T> implements Serializable {
+
+    private String name;
+    private String displayName;
+    private Accumulator<T> accumulator;
+
+    public SparkCounter() {
+        // For serialization.
+    }
+
+    public SparkCounter(
+            String name,
+            String displayName,
+            String groupName,
+            T initValue,
+            JavaSparkContext sparkContext) {
+
+        this.name = name;
+        this.displayName = displayName;
+
+        String accumulatorName = groupName + "_" + name;
+
+        if (sparkContext == null){
+            //Spark executors can register new Accumulators but they won't 
make it back to the driver hence the limitation
+            throw new  RuntimeException("Not allowed to create SparkCounter on 
backend executor.");
+
+        }
+        this.accumulator = sparkContext.accumulator(initValue, 
accumulatorName,  createAccumulatorParam());
+
+    }
+
+    protected abstract AccumulatorParam<T> createAccumulatorParam();
+
+    public T getValue() {
+        if (accumulator != null) {
+            return accumulator.value();
+        } else {
+            return null;
+        }
+    }
+
+    public void increment(T incr) {
+        accumulator.add(incr);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
+    public static class LongSparkCounter extends SparkCounter<Long> {
+
+        public LongSparkCounter(){}
+
+        public LongSparkCounter(
+                String name,
+                String displayName,
+                String groupName,
+                Long initValue,
+                JavaSparkContext sparkContext){
+            super(name, displayName, groupName, initValue, sparkContext);
+        }
+
+        @Override
+        protected AccumulatorParam<Long> createAccumulatorParam() {
+            return new LongAccumulatorParam();
+        }
+
+        private class LongAccumulatorParam implements AccumulatorParam<Long> {
+
+            @Override
+            public Long addAccumulator(Long t1, Long t2) {
+                return t1 + t2;
+            }
+
+            @Override
+            public Long addInPlace(Long r1, Long r2) {
+                return r1 + r2;
+            }
+
+            @Override
+            public Long zero(Long initialValue) {
+                return 0L;
+            }
+        }
+    }
+
+    public static class MapSparkCounter extends SparkCounter<Map<String,Long>> 
{
+
+        public MapSparkCounter(){}
+
+        public MapSparkCounter(
+                String name,
+                String displayName,
+                String groupName,
+                Map<String,Long> initValue,
+                JavaSparkContext sparkContext){
+            super(name, displayName, groupName, initValue, sparkContext);
+        }
+
+        @Override
+        protected AccumulatorParam<Map<String, Long>> createAccumulatorParam() 
{
+            return new MapAccumulatorParam();
+        }
+
+        private class MapAccumulatorParam implements 
AccumulatorParam<Map<String,Long>> {
+
+            @Override
+            public Map<String, Long> addAccumulator(Map<String, Long> t1, 
Map<String, Long> t2) {
+                return addInPlace(t1, t2);
+            }
+
+            @Override
+            public Map<String, Long> addInPlace(Map<String, Long> r1, 
Map<String, Long> r2) {
+                for (String key : r2.keySet()){
+                    Long r1val = r1.get(key);
+                    Long r2val = r2.get(key);
+                    r1.put(key,r1val == null ? r2val : r1val+r2val);
+                }
+                return r1;
+            }
+
+            @Override
+            public Map<String, Long> zero(Map<String, Long> initialValue) {
+                return new HashMap<>();
+            }
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java 
(added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java 
Mon May 29 15:00:39 2017
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class SparkCounterGroup<T> implements Serializable {
+    protected String groupName;
+    protected String groupDisplayName;
+    protected Map<String, SparkCounter<T>> sparkCounters;
+
+    protected transient JavaSparkContext javaSparkContext;
+
+    private SparkCounterGroup() {
+        // For serialization.
+    }
+
+    public SparkCounterGroup(
+            String groupName,
+            String groupDisplayName,
+            JavaSparkContext javaSparkContext) {
+        this.groupName = groupName;
+        this.groupDisplayName = groupDisplayName;
+        this.javaSparkContext = javaSparkContext;
+        this.sparkCounters = new HashMap<String, SparkCounter<T>>();
+    }
+
+    public abstract void createCounter(String name, T initValue);
+
+    public SparkCounter getCounter(String name) {
+        return sparkCounters.get(name);
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public String getGroupDisplayName() {
+        return groupDisplayName;
+    }
+
+    public void setGroupDisplayName(String groupDisplayName) {
+        this.groupDisplayName = groupDisplayName;
+    }
+
+    public Map<String, SparkCounter<T>> getSparkCounters() {
+        return sparkCounters;
+    }
+
+    public static class LongSparkCounterGroup extends SparkCounterGroup<Long> {
+
+        public LongSparkCounterGroup(
+                String groupName,
+                String groupDisplayName,
+                JavaSparkContext javaSparkContext) {
+            super(groupName,groupDisplayName,javaSparkContext);
+        }
+        public void createCounter(String name, Long initValue){
+            SparkCounter counter = new SparkCounter.LongSparkCounter(name, 
name, groupName, initValue, javaSparkContext);
+            sparkCounters.put(name,counter);
+        }
+    }
+
+    public static class MapSparkCounterGroup extends 
SparkCounterGroup<Map<String,Long>> {
+
+        public MapSparkCounterGroup(
+                String groupName,
+                String groupDisplayName,
+                JavaSparkContext javaSparkContext) {
+            super(groupName,groupDisplayName,javaSparkContext);
+        }
+        public void createCounter(String name, Map<String,Long> initValue){
+            SparkCounter counter = new SparkCounter.MapSparkCounter(name, 
name, groupName, initValue, javaSparkContext);
+            sparkCounters.put(name,counter);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java?rev=1796639&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java (added)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/spark/SparkCounters.java Mon 
May 29 15:00:39 2017
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
+import org.apache.pig.tools.pigstats.PigWarnCounter;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SparkCounters implements Serializable, PigWarnCounter {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = LogFactory.getLog(SparkCounters.class);
+
+    private Map<String, SparkCounterGroup> sparkCounterGroups;
+
+    private final transient JavaSparkContext javaSparkContext;
+
+    private SparkCounters() {
+        this(null);
+    }
+
+    public SparkCounters(JavaSparkContext javaSparkContext) {
+        this.javaSparkContext = javaSparkContext;
+        this.sparkCounterGroups = new HashMap<String, SparkCounterGroup>();
+    }
+
+    public void createCounter(Enum<?> key) {
+        createCounter(key.getDeclaringClass().getName(), key.name());
+    }
+
+    public void createCounter(String groupName, Enum<?> key) {
+        createCounter(groupName, key.name(), 0L);
+    }
+
+    public void createCounter(String groupName, String counterName) {
+        createCounter(groupName, counterName, 0L);
+    }
+
+    public void createCounter(String groupName, String counterName, Object 
initValue) {
+        getGroup(groupName).createCounter(counterName, initValue);
+    }
+
+    public void increment(Enum<?> key, long incrValue) {
+        increment(key.getDeclaringClass().getName(), key.name(), incrValue);
+    }
+
+    public void increment(String groupName, String counterName, long value) {
+        SparkCounter counter = getGroup(groupName).getCounter(counterName);
+        if (counter == null) {
+            LOG.error(String.format("counter[%s, %s] has not initialized 
before.", groupName, counterName));
+        } else {
+            counter.increment(value);
+        }
+    }
+
+    public Object getValue(String groupName, String counterName) {
+        SparkCounter counter = getGroup(groupName).getCounter(counterName);
+        if (counter == null) {
+            LOG.error(String.format("counter[%s, %s] has not initialized 
before.", groupName, counterName));
+            return null;
+        } else {
+            return counter.getValue();
+        }
+    }
+
+    public SparkCounter getCounter(String groupName, String counterName) {
+        return getGroup(groupName).getCounter(counterName);
+    }
+
+    public SparkCounter getCounter(Enum<?> key) {
+        return getCounter(key.getDeclaringClass().getName(), key.name());
+    }
+
+    private SparkCounterGroup getGroup(String groupName) {
+        SparkCounterGroup group = sparkCounterGroups.get(groupName);
+        if (group == null) {
+            group = new SparkCounterGroup.LongSparkCounterGroup(groupName, 
groupName, javaSparkContext);
+            sparkCounterGroups.put(groupName, group);
+        }
+        return group;
+    }
+
+    public Map<String, SparkCounterGroup> getSparkCounterGroups() {
+        return sparkCounterGroups;
+    }
+
+
+    @Override
+    public boolean incrWarnCounter(Enum<?> name, Object incr) {
+        SparkCounter counter = getCounter(PigWarning.SPARK_WARN);
+        return _incrWarnCounter(counter, name.name(), (Long) incr);
+    }
+
+    @Override
+    public boolean incrWarnCounter(String group, String name, Object incr) {
+        SparkCounter counter = getCounter(PigWarning.SPARK_CUSTOM_WARN);
+        return _incrWarnCounter(counter, group+"::"+name, (Long) incr);
+    }
+
+    private static boolean _incrWarnCounter(SparkCounter counter, String name, 
Long incr) {
+        if (counter == null){
+            return false;
+        }
+        Map<String,Long> map = new HashMap<String,Long>();
+        map.put(name, incr);
+        counter.increment(map);
+        return true;
+    }
+}


Reply via email to