Author: pradeepkth
Date: Wed Mar 17 22:44:18 2010
New Revision: 924536

URL: http://svn.apache.org/viewvc?rev=924536&view=rev
Log:
PIG-1287: Use hadoop-0.20.2 with pig 0.7.0 release (pradeepkth)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/lib/hadoop20.jar
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
    
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Mar 17 22:44:18 2010
@@ -68,6 +68,8 @@ manner (rding via pradeepkth)
 
 IMPROVEMENTS
 
+PIG-1287: Use hadoop-0.20.2 with pig 0.7.0 release (pradeepkth)
+
 PIG-1257: PigStorage per the new load-store redesign should support splitting
 of bzip files (pradeepkth)
 

Modified: hadoop/pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Wed Mar 17 22:44:18 2010
@@ -533,7 +533,6 @@
                     <!-- Excluded under Windows.-->
                     <exclude name="**/TestHBaseStorage.java" if="isWindows" />
                     <!-- Excluced because we don't want to run them -->
-                    <exclude name="**/TestCounters.java" />
                     <exclude name="**/PigExecTestCase.java" />
                     <exclude name="**/TypeCheckingTestUtil.java" />
                     <exclude name="**/TypeGraphPrinter.java" />

Modified: hadoop/pig/trunk/lib/hadoop20.jar
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hadoop20.jar?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
Binary files - no diff available.

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Wed Mar 17 22:44:18 2010
@@ -325,12 +325,13 @@ public class MapReduceLauncher extends L
         // scripts mess up the stats reporting from hadoop.
         List<String> rji = stats.getRootJobIDs();
         if ( (rji != null && rji.size() == 1 && finalStores == 1) || 
pc.getExecType() == ExecType.LOCAL ) {
-            if(stats.getRecordsWritten()==-1) {
+            // currently counters are not working in local mode - see PIG-1286
+            if(stats.getRecordsWritten()==-1 || pc.getExecType() == 
ExecType.LOCAL) {
                 log.info("Records written : Unable to determine number of 
records written");
             } else {
                 log.info("Records written : " + stats.getRecordsWritten());
             }
-            if(stats.getBytesWritten()==-1) {
+            if(stats.getBytesWritten()==-1 || pc.getExecType() == 
ExecType.LOCAL) {
                 log.info("Bytes written : Unable to determine number of bytes 
written");
             } else {
                 log.info("Bytes written : " + stats.getBytesWritten());

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Wed Mar 17 22:44:18 2010
@@ -134,7 +134,7 @@ public class PigCombiner {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setReporter(context);
+                pigHadoopLogger.setTaskIOContext(context);
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
             }
             

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Wed Mar 17 22:44:18 2010
@@ -19,7 +19,8 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 
 /**
@@ -39,7 +40,7 @@ public final class PigHadoopLogger imple
     } 
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
-    private Progressable reporter = null;
+    private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null;
     private boolean aggregate = false;
 
     private PigHadoopLogger() {
@@ -49,8 +50,9 @@ public final class PigHadoopLogger imple
     public void warn(Object o, String msg, Enum warningEnum) {
         String displayMessage = o.getClass().getName() + ": " + msg;
         if(aggregate) {
-            if(reporter != null) {
-                //reporter.incrCounter(warningEnum, 1);
+            if(taskIOContext != null) {
+                Counter c = taskIOContext.getCounter(warningEnum);
+                c.increment(1);
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used 
initially,
@@ -67,12 +69,12 @@ public final class PigHadoopLogger imple
         }
     }    
 
-    public Progressable getReporter() {
-        return reporter;
+    public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() {
+        return taskIOContext;
     }
 
-    public synchronized void setReporter(Progressable rep) {
-        this.reporter = rep;
+    public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?, 
?> tioc) {
+        this.taskIOContext = tioc;
     }
     
     public boolean getAggregate() {

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Wed Mar 17 22:44:18 2010
@@ -211,7 +211,7 @@ public abstract class PigMapBase extends
 
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
             pigHadoopLogger.setAggregate(aggregateWarning);
-            pigHadoopLogger.setReporter(context);
+            pigHadoopLogger.setTaskIOContext(context);
             PhysicalOperator.setPigLogger(pigHadoopLogger);
         }
         

Modified: 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Wed Mar 17 22:44:18 2010
@@ -338,7 +338,7 @@ public class PigMapReduce {
 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setReporter(context);
+                pigHadoopLogger.setTaskIOContext(context);
                 
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
@@ -550,7 +550,7 @@ public class PigMapReduce {
                 
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
-                pigHadoopLogger.setReporter(context);
+                pigHadoopLogger.setTaskIOContext(context);
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
                 
                 for (POStore store: stores) {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Mar 17 
22:44:18 2010
@@ -513,50 +513,55 @@ public class TestCounters extends TestCa
 
     }
     
-    @Test
-    public void testLocal() throws IOException, ExecException {
-        int count = 0;
-        //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, 
file));
-        File file = File.createTempFile("data", ".txt");
-        PrintWriter pw = new PrintWriter(new FileOutputStream(file));
-        int [] nos = new int[10];
-        for(int i = 0; i < 10; i++)
-            nos[i] = 0;
-
-        for(int i = 0; i < MAX; i++) {
-            int index = r.nextInt(10);
-            int value = r.nextInt(100);
-            nos[index] += value;
-            pw.println(index + "\t" + value);
-        }
-        pw.close();
-
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
-
-        File out = File.createTempFile("output", ".txt");
-        out.delete();
-        PigServer pigServer = new PigServer("local");
-        // FileLocalizer is initialized before using HDFS by previous tests
-        FileLocalizer.setInitialized(false);
-        pigServer.registerQuery("a = load '" + 
Util.encodeEscape(file.toString()) + "';");
-        pigServer.registerQuery("b = order a by $0;");
-        pigServer.registerQuery("c = group b by $0;");
-        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
-        PigStats pigStats = pigServer.store("d", "file://" + 
out.getAbsolutePath()).getStatistics();
-        InputStream is = 
FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), 
pigServer.getPigContext()), ExecType.MAPREDUCE, 
pigServer.getPigContext().getDfs());
-        long filesize = 0;
-        while(is.read() != -1) filesize++;
-        
-        is.close();
-        out.delete();
-        
-        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-        
-        assertEquals(10, pigStats.getRecordsWritten());
-        assertEquals(110, pigStats.getBytesWritten());
-
-    }
+    /*
+     * IMPORTANT NOTE:
+     * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
+     * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
+     */
+//    @Test
+//    public void testLocal() throws IOException, ExecException {
+//        int count = 0;
+//        //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, 
file));
+//        File file = File.createTempFile("data", ".txt");
+//        PrintWriter pw = new PrintWriter(new FileOutputStream(file));
+//        int [] nos = new int[10];
+//        for(int i = 0; i < 10; i++)
+//            nos[i] = 0;
+//
+//        for(int i = 0; i < MAX; i++) {
+//            int index = r.nextInt(10);
+//            int value = r.nextInt(100);
+//            nos[index] += value;
+//            pw.println(index + "\t" + value);
+//        }
+//        pw.close();
+//
+//        for(int i = 0; i < 10; i++) 
+//            if(nos[i] > 0)
+//                count ++;
+//
+//        File out = File.createTempFile("output", ".txt");
+//        out.delete();
+//        PigServer pigServer = new PigServer("local");
+//        // FileLocalizer is initialized before using HDFS by previous tests
+//        FileLocalizer.setInitialized(false);
+//        pigServer.registerQuery("a = load '" + 
Util.encodeEscape(file.toString()) + "';");
+//        pigServer.registerQuery("b = order a by $0;");
+//        pigServer.registerQuery("c = group b by $0;");
+//        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+//        PigStats pigStats = pigServer.store("d", "file://" + 
out.getAbsolutePath()).getStatistics();
+//        InputStream is = 
FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), 
pigServer.getPigContext()), ExecType.MAPREDUCE, 
pigServer.getPigContext().getDfs());
+//        long filesize = 0;
+//        while(is.read() != -1) filesize++;
+//        
+//        is.close();
+//        out.delete();
+//        
+//        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
+//        
+//        assertEquals(10, pigStats.getRecordsWritten());
+//        assertEquals(110, pigStats.getBytesWritten());
+//
+//    }
 
 }


Reply via email to