Author: pradeepkth
Date: Thu Nov 19 17:16:21 2009
New Revision: 882208

URL: http://svn.apache.org/viewvc?rev=882208&view=rev
Log:
Fixes for a couple of more unit tests

Modified:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java
    
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=882208&r1=882207&r2=882208&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 Thu Nov 19 17:16:21 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.StoreFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
@@ -78,9 +79,19 @@
      * @return
      * @throws IOException 
      */
+    @SuppressWarnings("unchecked")
     private List<OutputCommitter> getCommitters(TaskAttemptContext context,
             String storeLookupKey) throws IOException {
         Configuration conf = context.getConfiguration();
+        
+        // if there is a udf in the plan we would need to know the import
+        // path so we can instantiate the udf. This is required because
+        // we will be deserializing the POStores out of the plan in the next
+        // line below. The POStore inturn has a member reference to the 
Physical
+        // plan it is part of - so the deserialization goes deep and while
+        // deserializing the plan, the udf.import.list may be needed.
+        PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.
+                deserialize(conf.get("udf.import.list")));
         LinkedList<POStore> stores = (LinkedList<POStore>) ObjectSerializer.
         deserialize(conf.get(storeLookupKey));
         List<OutputCommitter> committers = new ArrayList<OutputCommitter>();

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java?rev=882208&r1=882207&r2=882208&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java 
(original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java 
Thu Nov 19 17:16:21 2009
@@ -17,21 +17,27 @@
  */
 package org.apache.pig.impl.io;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 
 
-// XXX: FIXME: make this work with load store redesign
 
 public class PigFile {
     private String file = null;
@@ -48,11 +54,10 @@
     
     public DataBag load(LoadFunc lfunc, PigContext pigContext) throws 
IOException {
         DataBag content = BagFactory.getInstance().newDefaultBag();
-        InputStream is = FileLocalizer.open(file, pigContext);
-        //XXX FIXME: make this work with new load-store redesign
-//        lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, 
Long.MAX_VALUE);
+        ReadToEndLoader loader = new ReadToEndLoader(lfunc, 
+                ConfigurationUtil.toConfiguration(pigContext.getProperties()), 
file, 0);
         Tuple f = null;
-        while ((f = lfunc.getNext()) != null) {
+        while ((f = loader.getNext()) != null) {
             content.add(f);
         }
         return content;
@@ -60,14 +65,36 @@
 
     
     public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) 
throws IOException {
-        BufferedOutputStream bos = new 
BufferedOutputStream(FileLocalizer.create(file, append, pigContext));
-//        sfunc.bindTo(bos);
-        for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
-            Tuple row = it.next();
-            sfunc.putNext(row);
+        Configuration conf = 
ConfigurationUtil.toConfiguration(pigContext.getProperties());
+        // create a simulated JobContext
+        JobContext jc = new JobContext(conf, new JobID());
+        OutputFormat<?,?> of = sfunc.getOutputFormat();
+        PigOutputFormat.setLocation(jc, sfunc, file);
+        OutputCommitter oc;
+        // create a simulated TaskAttemptContext
+        TaskAttemptContext tac = new TaskAttemptContext(conf, new 
TaskAttemptID());
+        PigOutputFormat.setLocation(tac, sfunc, file);
+        RecordWriter<?,?> rw ;
+        try {
+            of.checkOutputSpecs(jc);
+            oc = of.getOutputCommitter(tac);
+            oc.setupJob(jc);
+            oc.setupTask(tac);
+            rw = of.getRecordWriter(tac);
+            sfunc.prepareToWrite(rw);
+        
+            for (Iterator<Tuple> it = data.iterator(); it.hasNext();) {
+                Tuple row = it.next();
+                sfunc.putNext(row);
+            }
+            rw.close(tac);
+        } catch (InterruptedException e) {
+            throw new IOException(e);
         }
-//        sfunc.finish();
-        bos.close();
+        if(oc.needsTaskCommit(tac)) {
+            oc.commitTask(tac);
+        }
+        oc.cleanupJob(jc);
     }
 
     public String toString() {

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java?rev=882208&r1=882207&r2=882208&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java
 Thu Nov 19 17:16:21 2009
@@ -160,18 +160,21 @@
         t.append(weights);
         b.add(t);
         
-        String fileName = "file:"+File.createTempFile("tmp", "");
+        File tmpFile = File.createTempFile("tmp", "");
+        tmpFile.deleteOnExit();
+        String fileName = tmpFile.getAbsolutePath();
         PigFile f = new PigFile(fileName);
         f.store(b, new BinStorage(), pigServer.getPigContext());
         
         
-        pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + 
"' using BinStorage();");
+        pigServer.registerQuery("a = load '" + fileName + "' using 
BinStorage();");
         pigServer.registerQuery("b = foreach a generate 
$0#'apple',flatten($1#'orange');");
         Iterator<Tuple> iter = pigServer.openIterator("b");
         t = iter.next();
         assertEquals(t.get(0).toString(), "red");
         assertEquals(DataType.toDouble(t.get(1)), 0.3);
         assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, fileName);
     }
     
     static public class TitleNGrams extends EvalFunc<DataBag> {

Modified: 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=882208&r1=882207&r2=882208&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
 Thu Nov 19 17:16:21 2009
@@ -109,8 +109,6 @@
         tmpDir.delete();
         tmpDir.mkdir();
         
-        File tempDir = new File(tmpDir.getAbsolutePath());
-        Util.deleteDirectory(tempDir);
         File udf1Dir = new 
File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf1");
         udf1Dir.mkdirs();
         File udf2Dir = new 
File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf2");
@@ -172,18 +170,18 @@
 
         int LOOP_COUNT = 40;
         File tmpFile = File.createTempFile("test", "txt");
-        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        tmpFile.deleteOnExit();
+        String input[] = new String[LOOP_COUNT];
         Random r = new Random(1);
         int rand;
         for(int i = 0; i < LOOP_COUNT; i++) {
             rand = r.nextInt(100);
-            ps.println(rand);
+            input[i] = Integer.toString(rand);
         }
-        ps.close();
-        
+        Util.createInputFile(cluster, tmpFile.getCanonicalPath(), input);      
  
         FileLocalizer.deleteTempFiles();
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
cluster.getProperties());
-        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile.toString()) + "' using TestUDF2() AS 
(num:chararray);");
+        pigServer.registerQuery("A = LOAD '" + tmpFile.getCanonicalPath() + "' 
using TestUDF2() AS (num:chararray);");
         pigServer.registerQuery("B = foreach A generate TestUDF1(num);");
         Iterator<Tuple> iter = pigServer.openIterator("B");
         if(!iter.hasNext()) fail("No output found");
@@ -192,8 +190,8 @@
             assertTrue(t.get(0) instanceof Integer);
             assertTrue((Integer)t.get(0) == 1);
         }
-        
-        Util.deleteDirectory(tempDir);
+        Util.deleteFile(cluster, tmpFile.getCanonicalPath());
+        Util.deleteDirectory(tmpDir);
     }
 
     @After


Reply via email to