Author: gates Date: Tue Dec 1 14:38:11 2009 New Revision: 885772 URL: http://svn.apache.org/viewvc?rev=885772&view=rev Log: PIG-1074 Zebra store function should allow '::' in column names in output schema.
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=885772&r1=885771&r2=885772&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Dec 1 14:38:11 2009 @@ -8,6 +8,9 @@ IMPROVEMENTS + PIG-1074 Zebra store function should allow '::' in column names in output + schema (yanz via gates) + PIG-1077 Support record(row)-based file split in Zebra's TableInputFormat (chaow via gates) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt?rev=885772&r1=885771&r2=885772&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt Tue Dec 1 14:38:11 2009 @@ -68,7 +68,8 @@ <#LETTER : ["a"-"z", "A"-"Z"] > | <#DIGIT : ["0"-"9"] > | <#SPECIALCHAR : ["_", ".", "#"] > -| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* > +| <#SCOPEOP : "::"> +| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* ( <SCOPEOP> ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* > } ColumnType Type() : Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=885772&r1=885771&r2=885772&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt Tue Dec 1 14:38:11 2009 @@ -73,7 +73,8 @@ | <#OCTAL : ["0" - "7"] > | <#SPECIALCHAR : ["_"] > | <#FSSPECIALCHAR: ["-", ":", "/"]> -| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* > +| <#SCOPEOP : "::"> +| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* ( <SCOPEOP> ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* > | <SHORT : (<OCTAL>){3} > } Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java?rev=885772&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCogroup.java Tue Dec 1 14:38:11 2009 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.StringTokenizer; + +import junit.framework.Assert; +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.pig.TableStorer; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Note: + * + * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the + * app/debug configuration, when run this from inside the Eclipse. + * + */ +public class TestCogroup { + protected static ExecType execType = ExecType.MAPREDUCE; + private static MiniCluster cluster; + protected static PigServer pigServer; + private static Path pathTable; + private static Configuration conf; + private static FileSystem fs; + private static String zebraJar; + private static String whichCluster; + + @BeforeClass + public static void setUp() throws Exception { + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); // getAbsolutePath(); + System + .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); + } + + // if whichCluster is not defined, or defined something other than + // "realCluster" or "miniCluster", set it to "miniCluster" + if (System.getProperty("whichCluster") == null + || ((!System.getProperty("whichCluster") + .equalsIgnoreCase("realCluster")) && (!System.getProperty( + "whichCluster").equalsIgnoreCase("miniCluster")))) { + System.setProperty("whichCluster", "miniCluster"); + whichCluster = System.getProperty("whichCluster"); + } else { + whichCluster = System.getProperty("whichCluster"); + } + + System.out.println("cluster: " + whichCluster); + if (whichCluster.equalsIgnoreCase("realCluster") + && System.getenv("HADOOP_HOME") == null) { + System.out.println("Please set HADOOP_HOME"); + System.exit(0); + } + + conf = new Configuration(); + + if (whichCluster.equalsIgnoreCase("realCluster") + && System.getenv("USER") == null) { + System.out.println("Please set USER"); + System.exit(0); + } + zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar"; + File file = new File(zebraJar); + if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) { + System.out.println("Please put zebra.jar at hadoop_home/../jars"); + System.exit(0); + } + + if (whichCluster.equalsIgnoreCase("realCluster")) { + pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil + .toProperties(conf)); + pigServer.registerJar(zebraJar); + pathTable = new Path("/user/" + System.getenv("USER") + + "/TestTableMergeJoinAfterFilter"); + fs = pathTable.getFileSystem(conf); + } + + if (whichCluster.equalsIgnoreCase("miniCluster")) { + if (execType == ExecType.MAPREDUCE) { + cluster = MiniCluster.buildCluster(); + pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + fs = cluster.getFileSystem(); + pathTable = new Path(fs.getWorkingDirectory() + "/TestTableMergeJoinAfterFilter"); + System.out.println("path1 =" + pathTable); + } else { + pigServer = new PigServer(ExecType.LOCAL); + } + } + BasicTable.Writer writer = new BasicTable.Writer(pathTable, + "SF_a:string,SF_b:string,SF_c,SF_d,SF_e,SF_f,SF_g", + "[SF_a, SF_b, SF_c]; [SF_e, SF_f, SF_g]", conf); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + + final int numsBatch = 10; + final int numsInserters = 1; + TableInserter[] inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + for (int k = 0; k < tuple.size(); ++k) { + try { + tuple.set(k, (9-b) + "_" + i + "" + k); + } catch (ExecException e) { + e.printStackTrace(); + } + } + inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + /** + * Return the name of the routine that called getCurrentMethodName + * + */ + public String getCurrentMethodName() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintWriter pw = new PrintWriter(baos); + (new Throwable()).printStackTrace(pw); + pw.flush(); + String stackTrace = baos.toString(); + pw.close(); + + StringTokenizer tok = new StringTokenizer(stackTrace, "\n"); + tok.nextToken(); // 'java.lang.Throwable' + tok.nextToken(); // 'at ...getCurrentMethodName' + String l = tok.nextToken(); // 'at ...<caller to getCurrentRoutine>' + // Parse line 3 + tok = new StringTokenizer(l.trim(), " <("); + String t = tok.nextToken(); // 'at' + t = tok.nextToken(); // '...<caller to getCurrentRoutine>' + return t; + } + + @Test + public void testStorer() throws ExecException, IOException { + /* + * Use pig LOAD to load testing data for store + */ + String query = "records = LOAD '" + pathTable.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader();"; + pigServer.registerQuery(query); + + /* + Iterator<Tuple> it2 = pigServer.openIterator("records"); + int row0 = 0; + Tuple RowValue2 = null; + while (it2.hasNext()) { + // Last row value + RowValue2 = it2.next(); + row0++; + if (row0 == 10) { + Assert.assertEquals("0_01", RowValue2.get(1)); + Assert.assertEquals("0_00", RowValue2.get(0)); + } + } + Assert.assertEquals(10, row0); + */ + + String filter1 = "records3 = FILTER records BY SF_a > '4';"; + pigServer.registerQuery(filter1); + + String filter2 = "records4 = FILTER records BY SF_a > '4';"; + pigServer.registerQuery(filter2); + + String cog = "records5 = cogroup records3 by SF_a, records4 by SF_a;"; + pigServer.registerQuery(cog); + + String foreach = "records6 = foreach records5 generate flatten(records3), flatten(records4);"; + pigServer.registerQuery(foreach); + + Path newPath = new Path(getCurrentMethodName()); + + /* + * Table1 creation + */ + ExecJob pigJob = pigServer + .store( + "records6", + newPath.toString()+"1", + TableStorer.class.getCanonicalName() + + "('[records3::SF_a]; [records4::SF_a]')"); + Assert.assertNull(pigJob.getException()); + } +} Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java?rev=885772&r1=885771&r2=885772&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestSchemaPrimitive.java Tue Dec 1 14:38:11 2009 @@ -46,7 +46,7 @@ @Test public void testSchemaValid2() throws ParseException { - String strSch = "f1:int, f2, f3:float, f4, f5:string, f6:bytes"; + String strSch = "f1:int, f2, f3:float, f4, f5:string, f6::f61, f7::f71:map"; TableSchemaParser parser; Schema schema; @@ -61,6 +61,14 @@ ColumnSchema f4 = schema.getColumn(3); Assert.assertEquals("f4", f4.getName()); Assert.assertEquals(ColumnType.BYTES, f4.getType()); + + ColumnSchema f6 = schema.getColumn(5); + Assert.assertEquals("f6::f61", f6.getName()); + Assert.assertEquals(ColumnType.BYTES, f6.getType()); + + ColumnSchema f7 = schema.getColumn(6); + Assert.assertEquals("f7::f71", f7.getName()); + Assert.assertEquals(ColumnType.MAP, f7.getType()); } /* @@ -153,4 +161,23 @@ Assert.assertEquals(errMsg.startsWith(str), true); } } + + @Test + public void testSchemaInvalid5() { + try { + String strSch = "f1:f11"; + TableSchemaParser parser; + Schema schema; + + parser = new TableSchemaParser(new StringReader(strSch)); + schema = parser.RecordSchema(null); + System.out.println(schema); + } catch (Exception e) { + String errMsg = e.getMessage(); + String str = "Encountered \" <IDENTIFIER> \"f11 \"\" at line 1, column 4."; + System.out.println(errMsg); + System.out.println(str); + Assert.assertEquals(errMsg.startsWith(str), true); + } + } }