Author: gates
Date: Tue Dec  1 14:38:11 2009
New Revision: 885772

URL: http://svn.apache.org/viewvc?rev=885772&view=rev
Log:
PIG-1074 Zebra store function should allow '::' in column names in output 
schema.

Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=885772&r1=885771&r2=885772&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Dec  1 14:38:11 2009
@@ -8,6 +8,9 @@
 
   IMPROVEMENTS
 
+       PIG-1074 Zebra store function should allow '::' in column names in 
output
+       schema (yanz via gates)
+
     PIG-1077 Support record(row)-based file split in Zebra's
        TableInputFormat (chaow via gates)
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt?rev=885772&r1=885771&r2=885772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
 Tue Dec  1 14:38:11 2009
@@ -68,7 +68,8 @@
        <#LETTER : ["a"-"z", "A"-"Z"] >
 |      <#DIGIT : ["0"-"9"] >
 |   <#SPECIALCHAR : ["_", ".", "#"] >
-|      <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* >
+| <#SCOPEOP : "::">
+|      <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* ( 
<SCOPEOP>  ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* >
 }
 
 ColumnType Type() : 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=885772&r1=885771&r2=885772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
 Tue Dec  1 14:38:11 2009
@@ -73,7 +73,8 @@
 |   <#OCTAL : ["0" - "7"] >
 |   <#SPECIALCHAR : ["_"] >
 |   <#FSSPECIALCHAR: ["-", ":", "/"]>
-|      <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* >
+| <#SCOPEOP : "::">
+|      <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* ( 
<SCOPEOP>  ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* >
 |   <SHORT     :       (<OCTAL>){3}    >
 }
 

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java?rev=885772&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java
 Tue Dec  1 14:38:11 2009
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ * 
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ * 
+ */
+public class TestCogroup {
+  protected static ExecType execType = ExecType.MAPREDUCE;
+  private static MiniCluster cluster;
+  protected static PigServer pigServer;
+  private static Path pathTable;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static String zebraJar;
+  private static String whichCluster;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System
+          .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+    }
+
+    // if whichCluster is not defined, or defined something other than
+    // "realCluster" or "miniCluster", set it to "miniCluster"
+    if (System.getProperty("whichCluster") == null
+        || ((!System.getProperty("whichCluster")
+            .equalsIgnoreCase("realCluster")) && (!System.getProperty(
+            "whichCluster").equalsIgnoreCase("miniCluster")))) {
+      System.setProperty("whichCluster", "miniCluster");
+      whichCluster = System.getProperty("whichCluster");
+    } else {
+      whichCluster = System.getProperty("whichCluster");
+    }
+
+    System.out.println("cluster: " + whichCluster);
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("HADOOP_HOME") == null) {
+      System.out.println("Please set HADOOP_HOME");
+      System.exit(0);
+    }
+
+    conf = new Configuration();
+
+    if (whichCluster.equalsIgnoreCase("realCluster")
+        && System.getenv("USER") == null) {
+      System.out.println("Please set USER");
+      System.exit(0);
+    }
+    zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+    File file = new File(zebraJar);
+    if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+      System.out.println("Please put zebra.jar at hadoop_home/../jars");
+      System.exit(0);
+    }
+
+    if (whichCluster.equalsIgnoreCase("realCluster")) {
+      pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+          .toProperties(conf));
+      pigServer.registerJar(zebraJar);
+      pathTable = new Path("/user/" + System.getenv("USER")
+          + "/TestTableMergeJoinAfterFilter");
+      fs = pathTable.getFileSystem(conf);
+    }
+
+    if (whichCluster.equalsIgnoreCase("miniCluster")) {
+      if (execType == ExecType.MAPREDUCE) {
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        fs = cluster.getFileSystem();
+        pathTable = new Path(fs.getWorkingDirectory() + 
"/TestTableMergeJoinAfterFilter");
+        System.out.println("path1 =" + pathTable);
+      } else {
+        pigServer = new PigServer(ExecType.LOCAL);
+      }
+    }
+    BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+        "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g",
+        "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf);
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    final int numsBatch = 10;
+    final int numsInserters = 1;
+    TableInserter[] inserters = new TableInserter[numsInserters];
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i] = writer.getInserter("ins" + i, false);
+    }
+
+    for (int b = 0; b < numsBatch; b++) {
+      for (int i = 0; i < numsInserters; i++) {
+        TypesUtils.resetTuple(tuple);
+        for (int k = 0; k < tuple.size(); ++k) {
+          try {
+            tuple.set(k, (9-b) + "_" + i + "" + k);
+          } catch (ExecException e) {
+            e.printStackTrace();
+          }
+        }
+        inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple);
+      }
+    }
+    for (int i = 0; i < numsInserters; i++) {
+      inserters[i].close();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    pigServer.shutdown();
+  }
+
+  /**
+   * Return the name of the routine that called getCurrentMethodName
+   * 
+   */
+  public String getCurrentMethodName() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintWriter pw = new PrintWriter(baos);
+    (new Throwable()).printStackTrace(pw);
+    pw.flush();
+    String stackTrace = baos.toString();
+    pw.close();
+
+    StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+    tok.nextToken(); // 'java.lang.Throwable'
+    tok.nextToken(); // 'at ...getCurrentMethodName'
+    String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>'
+    // Parse line 3
+    tok = new StringTokenizer(l.trim(), " <(");
+    String t = tok.nextToken(); // 'at'
+    t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+    return t;
+  }
+
+  @Test
+  public void testStorer() throws ExecException, IOException {
+    /*
+     * Use pig LOAD to load testing data for store
+     */
+    String query = "records = LOAD '" + pathTable.toString()
+        + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+    pigServer.registerQuery(query);
+
+    /*
+    Iterator<Tuple> it2 = pigServer.openIterator("records");
+    int row0 = 0;
+    Tuple RowValue2 = null;
+    while (it2.hasNext()) {
+      // Last row value
+      RowValue2 = it2.next();
+      row0++;
+      if (row0 == 10) {
+        Assert.assertEquals("0_01", RowValue2.get(1));
+        Assert.assertEquals("0_00", RowValue2.get(0));
+      }
+    }
+    Assert.assertEquals(10, row0);
+    */
+
+    String filter1 = "records3 = FILTER records BY SF_a > '4';";
+    pigServer.registerQuery(filter1);
+
+    String filter2 = "records4 = FILTER records BY SF_a > '4';";
+    pigServer.registerQuery(filter2);
+
+    String cog = "records5 = cogroup records3 by SF_a, records4 by SF_a;";
+    pigServer.registerQuery(cog);
+
+    String foreach = "records6 = foreach records5 generate flatten(records3), 
flatten(records4);";
+    pigServer.registerQuery(foreach);
+
+    Path newPath = new Path(getCurrentMethodName());
+
+    /*
+     * Table1 creation
+     */
+    ExecJob pigJob = pigServer
+        .store(
+            "records6",
+            newPath.toString()+"1",
+            TableStorer.class.getCanonicalName()
+                + "('[records3::SF_a]; [records4::SF_a]')");
+    Assert.assertNull(pigJob.getException());
+  }
+}

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java?rev=885772&r1=885771&r2=885772&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java
 Tue Dec  1 14:38:11 2009
@@ -46,7 +46,7 @@
 
   @Test
   public void testSchemaValid2() throws ParseException {
-    String strSch = "f1:int, f2, f3:float, f4, f5:string, f6:bytes";
+    String strSch = "f1:int, f2, f3:float, f4, f5:string, f6::f61, 
f7::f71:map";
     TableSchemaParser parser;
     Schema schema;
 
@@ -61,6 +61,14 @@
     ColumnSchema f4 = schema.getColumn(3);
     Assert.assertEquals("f4", f4.getName());
     Assert.assertEquals(ColumnType.BYTES, f4.getType());
+    
+    ColumnSchema f6 = schema.getColumn(5);
+    Assert.assertEquals("f6::f61", f6.getName());
+    Assert.assertEquals(ColumnType.BYTES, f6.getType());
+    
+    ColumnSchema f7 = schema.getColumn(6);
+    Assert.assertEquals("f7::f71", f7.getName());
+    Assert.assertEquals(ColumnType.MAP, f7.getType());
   }
 
   /*
@@ -153,4 +161,23 @@
       Assert.assertEquals(errMsg.startsWith(str), true);
     }
   }
+  
+  @Test
+  public void testSchemaInvalid5() {
+    try {
+      String strSch = "f1:f11";
+      TableSchemaParser parser;
+      Schema schema;
+
+      parser = new TableSchemaParser(new StringReader(strSch));
+      schema = parser.RecordSchema(null);
+      System.out.println(schema);
+    } catch (Exception e) {
+      String errMsg = e.getMessage();
+      String str = "Encountered \" <IDENTIFIER> \"f11 \"\" at line 1, column 
4.";
+      System.out.println(errMsg);
+      System.out.println(str);
+      Assert.assertEquals(errMsg.startsWith(str), true);
+    }
+  }
 }


Reply via email to