Author: yanz
Date: Mon Jan 4 19:33:53 2010
New Revision: 895753
URL: http://svn.apache.org/viewvc?rev=895753&view=rev
Log:
PIG-1167: Hadoop file glob support (yanz)
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Jan 4 19:33:53 2010
@@ -52,6 +52,8 @@
BUG FIXES
+ PIG-1167: Hadoop file glob support (yanz)
+
PIG-1153: Record split exception fix (yanz)
PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
Mon Jan 4 19:33:53 2010
@@ -226,7 +226,7 @@
FileSystem fs = p.getFileSystem(jobConf);
FileStatus[] matches = fs.globStatus(p);
if (matches == null) {
- LOG.warn("Input path does not exist: "
+ p);
+ throw new IOException("Input path does
not exist: " + p);
}
else if (matches.length == 0) {
LOG.warn("Input Pattern " + p + "
matches 0 files");
@@ -293,33 +293,14 @@
Projection projection;
- if (!fileName.contains(",")) { // one table;
- org.apache.hadoop.zebra.schema.Schema tschema =
BasicTable.Reader.getSchema(new Path(fileName), jobConf);
- try {
- projection = new
org.apache.hadoop.zebra.types.Projection(tschema,
TableInputFormat.getProjection(jobConf));
- projectionSchema =
projection.getProjectionSchema();
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed :
"+e.getMessage());
- }
- } else { // table union;
- org.apache.hadoop.zebra.schema.Schema unionSchema = new
org.apache.hadoop.zebra.schema.Schema();
- for (Path p : paths) {
- org.apache.hadoop.zebra.schema.Schema schema =
BasicTable.Reader.getSchema(p, jobConf);
- try {
- unionSchema.unionSchema(schema);
- } catch (ParseException e) {
- throw new IOException(e.getMessage());
- }
- }
-
- try {
- projection = new
org.apache.hadoop.zebra.types.Projection(unionSchema,
TableInputFormat.getProjection(jobConf));
- projectionSchema =
projection.getProjectionSchema();
- } catch (ParseException e) {
- throw new IOException("Schema parsing failed :
"+e.getMessage());
- }
- }
-
+ org.apache.hadoop.zebra.schema.Schema tschema =
TableInputFormat.getSchema(jobConf);
+ try {
+ projection = new org.apache.hadoop.zebra.types.Projection(tschema,
TableInputFormat.getProjection(jobConf));
+ projectionSchema = projection.getProjectionSchema();
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failed : "+e.getMessage());
+ }
+
if (projectionSchema == null) {
throw new IOException("Cannot determine table
projection schema");
}
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=895753&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
Mon Jan 4 19:33:53 2010
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Note:
+ *
+ * Make sure you add the build/pig-0.1.0-dev-core.jar to the Classpath of the
+ * app/debug configuration, when run this from inside the Eclipse.
+ *
+ */
+public class TestGlobTableLoader{
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ private static Path pathTable;
+ private static Configuration conf;
+ private static String zebraJar;
+ private static String whichCluster;
+ private static FileSystem fs;
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System
+ .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs");
+ }
+
+ // if whichCluster is not defined, or defined something other than
+ // "realCluster" or "miniCluster", set it to "miniCluster"
+ if (System.getProperty("whichCluster") == null
+ || ((!System.getProperty("whichCluster")
+ .equalsIgnoreCase("realCluster")) && (!System.getProperty(
+ "whichCluster").equalsIgnoreCase("miniCluster")))) {
+ System.setProperty("whichCluster", "miniCluster");
+ whichCluster = System.getProperty("whichCluster");
+ } else {
+ whichCluster = System.getProperty("whichCluster");
+ }
+
+ System.out.println("cluster: " + whichCluster);
+ if (whichCluster.equalsIgnoreCase("realCluster")
+ && System.getenv("HADOOP_HOME") == null) {
+ System.out.println("Please set HADOOP_HOME");
+ System.exit(0);
+ }
+
+ conf = new Configuration();
+
+ if (whichCluster.equalsIgnoreCase("realCluster")
+ && System.getenv("USER") == null) {
+ System.out.println("Please set USER");
+ System.exit(0);
+ }
+ zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+ File file = new File(zebraJar);
+ if (!file.exists() && whichCluster.equalsIgnoreCase("realCulster")) {
+ System.out.println("Please put zebra.jar at hadoop_home/../jars");
+ System.exit(0);
+ }
+
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ pigServer = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil
+ .toProperties(conf));
+ pigServer.registerJar(zebraJar);
+ pathTable = new Path("/user/" + System.getenv("USER")
+ + "/TestMapTableLoader");
+ removeDir(pathTable);
+ fs = pathTable.getFileSystem(conf);
+ }
+
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ fs = cluster.getFileSystem();
+ pathTable = new Path(fs.getWorkingDirectory()
+ + "/TestMapTableLoader1");
+ removeDir(pathTable);
+ System.out.println("path1 =" + pathTable);
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+ }
+
+
+ BasicTable.Writer writer = new BasicTable.Writer(pathTable,
+ "m1:map(string)", "[m1#{a}]", conf);
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+
+ final int numsBatch = 10;
+ final int numsInserters = 2;
+ TableInserter[] inserters = new TableInserter[numsInserters];
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i] = writer.getInserter("ins" + i, false);
+ }
+
+ for (int b = 0; b < numsBatch; b++) {
+ for (int i = 0; i < numsInserters; i++) {
+ TypesUtils.resetTuple(tuple);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("a", "x");
+ map.put("b", "y");
+ map.put("c", "z");
+ tuple.set(0, map);
+
+ try {
+ inserters[i].insert(new BytesWritable(("key" + i).getBytes()),
tuple);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ }
+ }
+ for (int i = 0; i < numsInserters; i++) {
+ inserters[i].close();
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+ public static void removeDir(Path outPath) throws IOException {
+ String command = null;
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs -rmr " +
outPath.toString();
+ }
+ else{
+ command = "rm -rf " + outPath.toString();
+ }
+ Runtime runtime = Runtime.getRuntime();
+ Process proc = runtime.exec(command);
+ int exitVal = -1;
+ try {
+ exitVal = proc.waitFor();
+ } catch (InterruptedException e) {
+ System.err.println(e);
+ }
+
+ }
+
+ // @Test
+ public void test1() throws IOException, ParseException {
+ String projection = new String("m1#{b}");
+ BasicTable.Reader reader = new BasicTable.Reader(pathTable, conf);
+ reader.setProjection(projection);
+ // long totalBytes = reader.getStatus().getSize();
+
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ reader.close();
+ reader = new BasicTable.Reader(pathTable, conf);
+ reader.setProjection(projection);
+
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+ // HashMap<String, Object> mapval;
+ while (!scanner.atEnd()) {
+ scanner.getKey(key);
+ // Assert.assertEquals(key, new BytesWritable("key0".getBytes()));
+ scanner.getValue(value);
+ System.out.println("key = " + key + " value = " + value);
+
+ // mapval = (HashMap<String, Object>) value.get(0);
+ // Assert.assertEquals("x", mapval.get("a"));
+ // Assert.assertEquals(null, mapval.get("b"));
+ // Assert.assertEquals(null, mapval.get("c"));
+ scanner.advance();
+ }
+ reader.close();
+ }
+
+ @Test
+ public void testReader() throws ExecException, IOException {
+ pathTable = new Path("/user/" + System.getenv("USER")
+ + "/{TestMapTableLoader1}");
+ String query = "records = LOAD '" + pathTable.toString()
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('m1#{a}');";
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+ }
+}
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java?rev=895753&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveMultiTableGlob.java
Mon Jan 4 19:33:53 2010
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.pig.TableStorer;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.MiniCluster;
+
+import junit.framework.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestOrderPreserveMultiTableGlob {
+
+ final static int NUMB_TABLE = 10; // number of tables for
stress test
+ final static int NUMB_TABLE_ROWS = 5; // number of rows for each table
+
+ final static String TABLE_SCHEMA = "int1:int,str1:string,byte1:bytes";
+ final static String TABLE_STORAGE = "[int1,str1,byte1]";
+
+ static int fileId = 0;
+ static int sortId = 0;
+
+ protected static ExecType execType = ExecType.MAPREDUCE;
+ private static MiniCluster cluster;
+ protected static PigServer pigServer;
+ protected static ExecJob pigJob;
+
+ private static ArrayList<Path> pathTables;
+ private static int totalTableRows =0;
+
+ private static Configuration conf;
+ private static FileSystem fs;
+
+ private static String zebraJar;
+ private static String whichCluster;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); //
getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new
Path(base).toString() + "./logs");
+ }
+
+ // if whichCluster is not defined, or defined something other
than
+ // "realCluster" or "miniCluster", set it to "realCluster"
+ if (System.getProperty("whichCluster") == null
+ || ((!System.getProperty("whichCluster")
+
.equalsIgnoreCase("realCluster")) && (!System.getProperty(
+
"whichCluster").equalsIgnoreCase("miniCluster")))) {
+ System.setProperty("whichCluster", "miniCluster");
+ whichCluster = System.getProperty("whichCluster");
+ } else {
+ whichCluster = System.getProperty("whichCluster");
+ }
+
+ System.out.println("cluster: " + whichCluster);
+ if (whichCluster.equalsIgnoreCase("realCluster")
+ && System.getenv("HADOOP_HOME") == null) {
+ System.out.println("Please set HADOOP_HOME");
+ System.exit(0);
+ }
+
+ conf = new Configuration();
+
+ if (whichCluster.equalsIgnoreCase("realCluster")
+ && System.getenv("USER") == null) {
+ System.out.println("Please set USER");
+ System.exit(0);
+ }
+ zebraJar = System.getenv("HADOOP_HOME") + "/../jars/zebra.jar";
+ File file = new File(zebraJar);
+ if (!file.exists() &&
whichCluster.equalsIgnoreCase("realCulster")) {
+ System.out.println("Please put zebra.jar at
hadoop_home/../jars");
+ System.exit(0);
+ }
+
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ pigServer = new PigServer(ExecType.MAPREDUCE,
ConfigurationUtil
+ .toProperties(conf));
+ pigServer.registerJar(zebraJar);
+
+ pathTables = new ArrayList<Path>();
+ for (int i=0; i<NUMB_TABLE; ++i) {
+ Path pathTable = new Path("/user/" +
System.getenv("USER")
+ + "/TestOderPerserveMultiTable"
+ i);
+ pathTables.add(pathTable);
+ removeDir(pathTable);
+ }
+ fs = pathTables.get(0).getFileSystem(conf);
+ }
+
+ if (whichCluster.equalsIgnoreCase("miniCluster")) {
+ if (execType == ExecType.MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ fs = cluster.getFileSystem();
+
+ pathTables = new ArrayList<Path>();
+ for (int i=0; i<NUMB_TABLE; ++i) {
+ Path pathTable = new
Path(fs.getWorkingDirectory()
+ +
"/TestOderPerserveMultiTable" + i);
+ pathTables.add(pathTable);
+ removeDir(pathTable);
+ }
+ } else {
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+ }
+
+ // Create tables
+ for (int i=0; i<NUMB_TABLE; ++i) {
+ // Create table data
+ Object[][] table = new Object[NUMB_TABLE_ROWS][3]; //
three columns
+
+ for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+ table[j][0] = i;
+ table[j][1] = new String("string" + j);
+ table[j][2] = new DataByteArray("byte" +
(NUMB_TABLE_ROWS - j));
+ ++totalTableRows;
+ }
+ // Create table
+ createTable(pathTables.get(i), TABLE_SCHEMA,
TABLE_STORAGE, table);
+
+ // Load Table
+ String query = "table" + i + " = LOAD '" +
pathTables.get(i).toString() +
+ "' USING
org.apache.hadoop.zebra.pig.TableLoader();";
+ pigServer.registerQuery(query);
+ }
+ }
+
+ private static void createTable(Path path, String schemaString, String
storageString, Object[][] tableData)
+ throws IOException {
+ //
+ // Create table from tableData array
+ //
+ BasicTable.Writer writer = new BasicTable.Writer(path,
schemaString, storageString, conf);
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ TableInserter inserter = writer.getInserter("ins", false);
+
+ for (int i = 0; i < tableData.length; ++i) {
+ TypesUtils.resetTuple(tuple);
+ for (int k = 0; k < tableData[i].length; ++k) {
+ tuple.set(k, tableData[i][k]);
+ System.out.println("DEBUG: setting tuple k=" +
k + "value= " + tableData[i][k]);
+ }
+ inserter.insert(new BytesWritable(("key" +
i).getBytes()), tuple);
+ }
+ inserter.close();
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+
+ public static void removeDir(Path outPath) throws IOException {
+ String command = null;
+ if (whichCluster.equalsIgnoreCase("realCluster")) {
+ command = System.getenv("HADOOP_HOME") +"/bin/hadoop fs
-rmr " + outPath.toString();
+ }
+ else{
+ command = "rm -rf " + outPath.toString();
+ }
+ Runtime runtime = Runtime.getRuntime();
+ Process proc = runtime.exec(command);
+ int exitVal = -1;
+ try {
+ exitVal = proc.waitFor();
+ } catch (InterruptedException e) {
+ System.err.println(e);
+ }
+ }
+
+ private Iterator<Tuple> testOrderPreserveUnion(ArrayList<String>
inputTables, String sortkey, String columns)
+ throws IOException {
+ //
+ // Test order preserve union from input tables and provided
output columns
+ //
+ Assert.assertTrue("Table union requires two or more input
tables", inputTables.size() >= 2);
+
+ Path newPath = new Path(getCurrentMethodName());
+ ArrayList<String> pathList = new ArrayList<String>();
+
+ // Load and store each of the input tables
+ for (int i=0; i<inputTables.size(); ++i) {
+ String tablename = inputTables.get(i);
+ String sortName = "sort" + ++sortId;
+
+ // Sort tables
+ String orderby = sortName + " = ORDER " + tablename + "
BY " + sortkey + " ;";
+ pigServer.registerQuery(orderby);
+
+ String sortPath = new String(newPath.toString() +
++fileId); // increment fileId suffix
+
+ // Store sorted tables
+ pigJob = pigServer.store(sortName, sortPath,
TableStorer.class.getCanonicalName() +
+ "('" + TABLE_STORAGE + "')");
+ Assert.assertNull(pigJob.getException());
+
+ pathList.add(sortPath); // add table path to list
+ }
+
+ String paths = new String();
+
+ paths += newPath.toString() + "{";
+ fileId = 0;
+ for (String path:pathList)
+ paths += ++fileId + ",";
+ paths = paths.substring(0, paths.lastIndexOf(",")); // remove
trailing comma
+ paths += "}";
+
+ String queryLoad = "records1 = LOAD '"
+ + paths
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader('" +
columns + "', 'sorted');";
+
+ System.out.println("queryLoad: " + queryLoad);
+ pigServer.registerQuery(queryLoad);
+
+ // Return iterator
+ Iterator<Tuple> it1 = pigServer.openIterator("records1");
+ return it1;
+ }
+
+ @Test
+ public void test_sorted_union_multi_table() throws ExecException,
IOException {
+ //
+ // Test sorted union
+ //
+
+ // Create input tables for order preserve union
+ ArrayList<String> inputTables = new ArrayList<String>(); //
Input tables
+ for (int i=0; i<NUMB_TABLE; ++i) {
+ inputTables.add("table" + i); // add input table
+ }
+
+ // Test with input tables and provided output columns
+ testOrderPreserveUnion(inputTables, "int1", "int1, str1,
byte1");
+
+ // Create results table for verification
+ ArrayList<ArrayList<Object>> resultTable = new
ArrayList<ArrayList<Object>>();
+ for (int i=0; i<NUMB_TABLE; ++i) {
+ for (int j=0; j<NUMB_TABLE_ROWS; ++j) {
+ ArrayList<Object> resultRow = new
ArrayList<Object>();
+
+ resultRow.add(i); // int1
+ resultRow.add(new String("string" + j));
// str1
+ resultRow.add(new DataByteArray("byte" +
(NUMB_TABLE_ROWS - j))); // byte1
+
+ resultTable.add(resultRow);
+ }
+ }
+
+ // Verify union table
+ Iterator<Tuple> it = pigServer.openIterator("records1");
+ int numbRows = verifyTable(resultTable, 0, it);
+
+ Assert.assertEquals(totalTableRows, numbRows);
+
+ // Print Table
+ //printTable("records1");
+ }
+
+ /**
+ * Verify union output table with expected results
+ *
+ */
+ private int verifyTable(ArrayList<ArrayList<Object>> resultTable, int
keyColumn, Iterator<Tuple> it) throws IOException {
+ int numbRows = 0;
+ int index = 0;
+ Object value = resultTable.get(index).get(keyColumn); // get
value of primary key
+
+ while (it.hasNext()) {
+ Tuple rowValues = it.next();
+
+ // If last primary sort key does match then search for
next matching key
+ if (! compareObj(value, rowValues.get(keyColumn))) {
+ int subIndex = index + 1;
+ while (subIndex < resultTable.size()) {
+ if ( ! compareObj(value,
resultTable.get(subIndex).get(keyColumn)) ) { // found new key
+ index = subIndex;
+ value =
resultTable.get(index).get(keyColumn);
+ break;
+ }
+ ++subIndex;
+ }
+ Assert.assertEquals("Table comparison error for
row : " + numbRows + " - no key found for : "
+ + rowValues.get(keyColumn), value,
rowValues.get(keyColumn));
+ }
+ // Search for matching row with this primary key
+ int subIndex = index;
+
+ while (subIndex < resultTable.size()) {
+ // Compare row
+ ArrayList<Object> resultRow =
resultTable.get(subIndex);
+ if ( compareRow(rowValues, resultRow) )
+ break; // found matching row
+ ++subIndex;
+ Assert.assertEquals("Table comparison error for
row : " + numbRows + " - no matching row found for : "
+ + rowValues.get(keyColumn), value,
resultTable.get(subIndex).get(keyColumn));
+ }
+ ++numbRows;
+ }
+ Assert.assertEquals(resultTable.size(), numbRows); // verify
expected row count
+ return numbRows;
+ }
+
+ /**
+ * Compare table rows
+ *
+ */
+ private boolean compareRow(Tuple rowValues, ArrayList<Object>
resultRow) throws IOException {
+ boolean result = true;
+ Assert.assertEquals(resultRow.size(), rowValues.size());
+ for (int i = 0; i < rowValues.size(); ++i) {
+ if (! compareObj(rowValues.get(i), resultRow.get(i)) ) {
+ result = false;
+ break;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Compare table values
+ *
+ */
+ private boolean compareObj(Object object1, Object object2) {
+ if (object1 == null) {
+ if (object2 == null)
+ return true;
+ else
+ return false;
+ } else if (object1.equals(object2))
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * Print Pig Table (for debugging)
+ *
+ */
+ private int printTable(String tablename) throws IOException {
+ Iterator<Tuple> it1 = pigServer.openIterator(tablename);
+ int numbRows = 0;
+ while (it1.hasNext()) {
+ Tuple RowValue1 = it1.next();
+ ++numbRows;
+ System.out.println();
+ for (int i = 0; i < RowValue1.size(); ++i)
+ System.out.println("DEBUG: " + tablename + "
RowValue.get(" + i + ") = " + RowValue1.get(i));
+ }
+ System.out.println("\nRow count : " + numbRows);
+ return numbRows;
+ }
+
+ /**
+ * Return the name of the routine that called getCurrentMethodName
+ *
+ */
+ private String getCurrentMethodName() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter pw = new PrintWriter(baos);
+ (new Throwable()).printStackTrace(pw);
+ pw.flush();
+ String stackTrace = baos.toString();
+ pw.close();
+
+ StringTokenizer tok = new StringTokenizer(stackTrace, "\n");
+ tok.nextToken(); // 'java.lang.Throwable'
+ tok.nextToken(); // 'at ...getCurrentMethodName'
+ String l = tok.nextToken(); // 'at ...<caller to
getCurrentRoutine>'
+ // Parse line 3
+ tok = new StringTokenizer(l.trim(), " <(");
+ String t = tok.nextToken(); // 'at'
+ t = tok.nextToken(); // '...<caller to getCurrentRoutine>'
+ return t;
+ }
+
+}
Modified:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java?rev=895753&r1=895752&r2=895753&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestOrderPreserveProjectionNegative.java
Mon Jan 4 19:33:53 2010
@@ -455,7 +455,7 @@
} finally {
//System.out.println(getStackTrace(exception));
Assert.assertNotNull(exception);
-
Assert.assertTrue(getStackTrace(exception).contains("Schema file doesn't
exist"));
+ Assert.assertTrue(getStackTrace(exception).contains("Input path does not
exist: "));
}
}
@@ -465,6 +465,7 @@
// Test sorted union error handling when one of the table paths
is invalid (Negative test)
//
IOException exception = null;
+ String pathSort2 = null;
try {
// Sort tables
@@ -479,7 +480,7 @@
"('" + TABLE1_STORAGE + "')");
Assert.assertNull(pigJob.getException());
- String pathSort2 = newPath.toString() + "2"; //
invalid path
+ pathSort2 = newPath.toString() + "2"; // invalid path
String queryLoad = "records1 = LOAD '"
+ pathSort1 + ","