Author: dvryaboy Date: Wed Jul 14 06:14:33 2010 New Revision: 963952 URL: http://svn.apache.org/viewvc?rev=963952&view=rev Log: PIG-1428: Make a StatusReporter singleton available for incrementing counters (dvryaboy)
Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/CHANGES.txt (original) +++ hadoop/pig/branches/branch-0.7/CHANGES.txt Wed Jul 14 06:14:33 2010 @@ -22,9 +22,6 @@ Release 0.7.0 - 2010-05-03 INCOMPATIBLE CHANGES -PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs -(rding) - PIG-1292: Interface Refinements (hashutosh) PIG-1259: ResourceFieldSchema.setSchema should not allow a bag field without a @@ -71,6 +68,11 @@ manner (rding via pradeepkth) IMPROVEMENTS +PIG-1428: Make a StatusReporter singleton available for incrementing counters (dvryaboy) + +PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs +(rding) + PIG-1309: Map-side Cogroup (hashutosh) PIG-1441: new test targets (olgan) Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Wed Jul 14 06:14:33 2010 @@ -22,9 +22,12 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl; +import org.apache.pig.tools.pigstats.PigStatusReporter; + /** * This class is used to have a POStore write to DFS via a output * collector/record writer. It sets up a modified job configuration to @@ -36,21 +39,27 @@ public class MapReducePOStoreImpl extend private TaskAttemptContext context; + private PigStatusReporter reporter; + @SuppressWarnings("unchecked") private RecordWriter writer; - public MapReducePOStoreImpl(TaskAttemptContext context) { + public MapReducePOStoreImpl(TaskInputOutputContext context) { // get a copy of the Configuration so that changes to the // configuration below (like setting the output location) do // not affect the caller's copy Configuration outputConf = new Configuration(context.getConfiguration()); + PigStatusReporter.setContext(context); + reporter = PigStatusReporter.getInstance(); + // make a copy of the Context to use here - since in the same // task (map or reduce) we could have multiple stores, we should // make this copy so that the same context does not get over-written // by the different stores. this.context = new TaskAttemptContext(outputConf, context.getTaskAttemptID()); + } @SuppressWarnings("unchecked") Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Jul 14 06:14:33 2010 @@ -42,6 +42,7 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.tools.pigstats.PigStatusReporter; public class PigCombiner { @@ -134,7 +135,8 @@ public class PigCombiner { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setTaskIOContext(context); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); } Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Jul 14 06:14:33 2010 @@ -19,9 +19,9 @@ package org.apache.pig.backend.hadoop.ex import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; +import org.apache.pig.tools.pigstats.PigStatusReporter; + /** * @@ -40,7 +40,7 @@ public final class PigHadoopLogger imple } private static Log log = LogFactory.getLog(PigHadoopLogger.class); - private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null; + private PigStatusReporter reporter = null; private boolean aggregate = false; private PigHadoopLogger() { @@ -50,10 +50,9 @@ public final class PigHadoopLogger imple public void warn(Object o, String msg, Enum warningEnum) { String displayMessage = o.getClass().getName() + ": " + msg; if(aggregate) { - if(taskIOContext != null) { - Counter c = taskIOContext.getCounter(warningEnum); - c.increment(1); - } else { + if (reporter != null) { + reporter.getCounter(warningEnum).increment(1); + } else { //TODO: //in local mode of execution if the PigHadoopLogger is used initially, //then aggregation cannot be performed as the reporter will be null. @@ -69,14 +68,10 @@ public final class PigHadoopLogger imple } } - public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() { - return taskIOContext; + public synchronized void setReporter(PigStatusReporter rep) { + this.reporter = rep; } - public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?, ?> tioc) { - this.taskIOContext = tioc; - } - public boolean getAggregate() { return aggregate; } Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Jul 14 06:14:33 2010 @@ -48,6 +48,7 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.tools.pigstats.PigStatusReporter; public abstract class PigMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> { private static final Tuple DUMMYTUPLE = null; @@ -211,7 +212,8 @@ public abstract class PigMapBase extends PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setTaskIOContext(context); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); } Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Jul 14 06:14:33 2010 @@ -55,6 +55,8 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.SpillableMemoryManager; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStatusReporter; + /** * This class is the static Mapper & Reducer classes that @@ -348,8 +350,9 @@ public class PigMapReduce { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setTaskIOContext(context); - + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); + PhysicalOperator.setPigLogger(pigHadoopLogger); for (POStore store: stores) { @@ -560,7 +563,8 @@ public class PigMapReduce { PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); pigHadoopLogger.setAggregate(aggregateWarning); - pigHadoopLogger.setTaskIOContext(context); + PigStatusReporter.setContext(context); + pigHadoopLogger.setReporter(PigStatusReporter.getInstance()); PhysicalOperator.setPigLogger(pigHadoopLogger); for (POStore store: stores) { Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java?rev=963952&r1=963951&r2=963952&view=diff ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/data/DefaultAbstractBag.java Wed Jul 14 06:14:33 2010 @@ -37,6 +37,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger; import org.apache.pig.impl.util.BagFormat; import org.apache.pig.impl.util.Spillable; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -373,10 +374,12 @@ public abstract class DefaultAbstractBag } protected void incSpillCount(Enum counter) { - // Increment the spill count - // warn is a misnomer. The function updates the counter. If the update - // fails, it dumps a warning - PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.getCounter(counter).increment(1); + } else { + PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + } } public static abstract class BagDelimiterTuple extends DefaultTuple{} Added: hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=963952&view=auto ============================================================================== --- hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (added) +++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Wed Jul 14 06:14:33 2010 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.tools.pigstats; + +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; +import org.apache.hadoop.util.Progressable; + +...@suppresswarnings("unchecked") +public class PigStatusReporter extends StatusReporter implements Progressable { + + private TaskInputOutputContext context; + private static PigStatusReporter reporter = null; + /** + * Get singleton instance of the context + */ + public static PigStatusReporter getInstance() { + if (reporter == null) { + reporter = new PigStatusReporter(null); + } + return reporter; + } + + public static void setContext(TaskInputOutputContext context) { + reporter = new PigStatusReporter(context); + } + + private PigStatusReporter(TaskInputOutputContext context) { + this.context = context; + } + + @Override + public Counter getCounter(Enum<?> name) { + return (context == null) ? null : context.getCounter(name); + } + + @Override + public Counter getCounter(String group, String name) { + return (context == null) ? null : context.getCounter(group, name); + } + + @Override + public void progress() { + if (context != null) { + context.progress(); + } + } + + @Override + public void setStatus(String status) { + if (context != null) { + context.setStatus(status); + } + } + +}