Added: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TokenMgrError.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TokenMgrError.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TokenMgrError.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TokenMgrError.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Generated By:JavaCC: Do not edit this line. TokenMgrError.java Version 4.1 
*/
+/* JavaCCOptions: */
+package org.apache.hadoop.zebra.types;
+
+/** Token Manager Error. */
+public class TokenMgrError extends Error
+{
+
+  /**
+   * The version identifier for this Serializable class.
+   * Increment only if the <i>serialized</i> form of the
+   * class changes.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /*
+   * Ordinals for various reasons why an Error of this type can be thrown.
+   */
+
+  /**
+   * Lexical error occurred.
+   */
+  static final int LEXICAL_ERROR = 0;
+
+  /**
+   * An attempt was made to create a second instance of a static token manager.
+   */
+  static final int STATIC_LEXER_ERROR = 1;
+
+  /**
+   * Tried to change to an invalid lexical state.
+   */
+  static final int INVALID_LEXICAL_STATE = 2;
+
+  /**
+   * Detected (and bailed out of) an infinite loop in the token manager.
+   */
+  static final int LOOP_DETECTED = 3;
+
+  /**
+   * Indicates the reason why the exception is thrown. It will have
+   * one of the above 4 values.
+   */
+  int errorCode;
+
+  /**
+   * Replaces unprintable characters by their escaped (or unicode escaped)
+   * equivalents in the given string
+   */
+  protected static final String addEscapes(String str) {
+    StringBuffer retval = new StringBuffer();
+    char ch;
+    for (int i = 0; i < str.length(); i++) {
+      switch (str.charAt(i))
+      {
+        case 0 :
+          continue;
+        case '\b':
+          retval.append("\\b");
+          continue;
+        case '\t':
+          retval.append("\\t");
+          continue;
+        case '\n':
+          retval.append("\\n");
+          continue;
+        case '\f':
+          retval.append("\\f");
+          continue;
+        case '\r':
+          retval.append("\\r");
+          continue;
+        case '\"':
+          retval.append("\\\"");
+          continue;
+        case '\'':
+          retval.append("\\\'");
+          continue;
+        case '\\':
+          retval.append("\\\\");
+          continue;
+        default:
+          if ((ch = str.charAt(i)) < 0x20 || ch > 0x7e) {
+            String s = "0000" + Integer.toString(ch, 16);
+            retval.append("\\u" + s.substring(s.length() - 4, s.length()));
+          } else {
+            retval.append(ch);
+          }
+          continue;
+      }
+    }
+    return retval.toString();
+  }
+
+  /**
+   * Returns a detailed message for the Error when it is thrown by the
+   * token manager to indicate a lexical error.
+   * Parameters :
+   *    EOFSeen     : indicates if EOF caused the lexical error
+   *    curLexState : lexical state in which this error occurred
+   *    errorLine   : line number when the error occurred
+   *    errorColumn : column number when the error occurred
+   *    errorAfter  : prefix that was seen before this error occurred
+   *    curchar     : the offending character
+   * Note: You can customize the lexical error message by modifying this 
method.
+   */
+  protected static String LexicalError(boolean EOFSeen, int lexState, int 
errorLine, int errorColumn, String errorAfter, char curChar) {
+    return("Lexical error at line " +
+          errorLine + ", column " +
+          errorColumn + ".  Encountered: " +
+          (EOFSeen ? "<EOF> " : ("\"" + addEscapes(String.valueOf(curChar)) + 
"\"") + " (" + (int)curChar + "), ") +
+          "after : \"" + addEscapes(errorAfter) + "\"");
+  }
+
+  /**
+   * You can also modify the body of this method to customize your error 
messages.
+   * For example, cases like LOOP_DETECTED and INVALID_LEXICAL_STATE are not
+   * of end-users concern, so you can return something like :
+   *
+   *     "Internal Error : Please file a bug report .... "
+   *
+   * from this method for such cases in the release version of your parser.
+   */
+  public String getMessage() {
+    return super.getMessage();
+  }
+
+  /*
+   * Constructors of various flavors follow.
+   */
+
+  /** No arg constructor. */
+  public TokenMgrError() {
+  }
+
+  /** Constructor with message and reason. */
+  public TokenMgrError(String message, int reason) {
+    super(message);
+    errorCode = reason;
+  }
+
+  /** Full Constructor. */
+  public TokenMgrError(boolean EOFSeen, int lexState, int errorLine, int 
errorColumn, String errorAfter, char curChar, int reason) {
+    this(LexicalError(EOFSeen, lexState, errorLine, errorColumn, errorAfter, 
curChar), reason);
+  }
+}
+/* JavaCC - OriginalChecksum=3a929d89e9ba57096ed151b8a005959f (do not edit 
this line) */

Added: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.types;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * Utility methods manipulating Table types (speicifically, Tuple objects).
+ */
+public class TypesUtils {
+  static TupleFactory tf = DefaultTupleFactory.getInstance();
+
+  /**
+   * Create a tuple based on a schema
+   * 
+   * @param schema
+   *          The schema that the tuple will conform to.
+   * @return A suitable Tuple object that can be used to read or write a Table
+   *         with the same input or output schema.
+   */
+  public static Tuple createTuple(Schema schema) throws IOException {
+    Tuple tuple = tf.newTuple(schema.getNumColumns());
+    for (int i = 0; i < schema.getNumColumns(); ++i) {
+      tuple.set(i, null);
+    }
+    return tuple;
+  }
+  
+  /**
+   * create a tuple based on number of columns
+   */
+  public static Tuple createTuple(int size) throws IOException {
+    Tuple tuple = tf.newTuple(size);
+    for (int i = 0; i < size; ++i) {
+      tuple.set(i, null);
+    }
+    return tuple;
+  }
+
+  /**
+   * Create a PIG Bag object.
+   * 
+   * @return A Pig DataBag object.
+   */
+  public static DataBag createBag() {
+    return new DefaultDataBag();
+  }
+
+  //TODO - sync up with yan about this change
+  public static DataBag createBag(Schema schema) {
+           return new DefaultDataBag();
+  }
+
+  /**
+   * Reset the Tuple so that all fields are NULL field. This is different from
+   * clearing the tuple, in which case the size of the tuple will become zero.
+   * 
+   * @param tuple
+   *          Input tuple.
+   */
+  public static void resetTuple(Tuple tuple) {
+    try {
+      for (int i = 0; i < tuple.size(); ++i) {
+        tuple.set(i, null);
+      }
+    }
+    catch (Exception e) {
+      throw new RuntimeException("Internal error: " + e.toString());
+    }
+  }
+
+  /**
+   * Check whether the input row object is compatible with the expected schema
+   * 
+   * @param tuple
+   *          Input Tuple object
+   * @param schema
+   *          Table schema
+   * @throws IOException
+   */
+  public static void checkCompatible(Tuple tuple, Schema schema)
+      throws IOException {
+    // TODO: Add more rigorous checking.
+    if (tuple.size() != schema.getNumColumns()) {
+      throw new IOException("Incompatible Tuple object - number of fields");
+    }
+  }
+
+  /**
+   * Reading a tuple from disk with projection.
+   */
+  public static class TupleReader {
+    private Tuple tuple;
+    @SuppressWarnings("unused")
+    private Schema physical;
+    private Projection projection;
+    SubColumnExtraction.SubColumn subcolextractor = null;
+
+    /**
+     * Constructor - create a TupleReader than can parse the serialized Tuple
+     * with the specified physical schema, and produce the Tuples based on the
+     * projection.
+     * 
+     * @param physical
+     *          The physical schema of on-disk data.
+     * @param projection
+     *          The logical schema of tuples user expect.
+     */
+    public TupleReader(Schema physical, Projection projection)
+        throws IOException, ParseException {
+      tuple = createTuple(physical);
+      this.physical = physical;
+      this.projection = projection;
+      subcolextractor = new SubColumnExtraction.SubColumn(physical, 
projection);
+      subcolextractor.dispatchSource(tuple);
+    }
+
+    public Schema getSchema() {
+      return physical;
+    }
+
+    public Projection getprojction() {
+      return projection;
+    }
+
+    /**
+     * Read a tuple from the stream, and perform projection.
+     * 
+     * @param in
+     *          The input stream
+     * @param row
+     *          The input tuple that should conform to the projection schema.
+     * @throws IOException
+     */
+    public void get(DataInputStream in, Tuple row) throws IOException, 
ParseException {
+      checkCompatible(row, projection.getSchema());
+      tuple.readFields(in);
+      TypesUtils.resetTuple(row);
+      try {
+        subcolextractor.splitColumns(row);
+      }
+      catch (ExecException e) {
+        // not going to happen.
+      }
+    }
+  }
+
+  /**
+   * Writing a tuple to disk.
+   */
+  public static class TupleWriter {
+    private Schema physical;
+
+    /**
+     * The constructor
+     * 
+     * @param physical
+     *          The physical schema of the tuple.
+     */
+    public TupleWriter(Schema physical) {
+      this.physical = physical;
+    }
+
+    /**
+     * Write a tuple to the output stream.
+     * 
+     * @param out
+     *          The output stream
+     * @param row
+     *          The user tuple that should conform to the physical schema.
+     * @throws IOException
+     */
+    public void put(DataOutputStream out, Tuple row) throws IOException {
+      checkCompatible(row, physical);
+      row.write(out);
+    }
+  }
+
+  /**
+   * Checking and formatting an input tuple to conform to the input schema.<br>
+   * 
+   *           The current implementation always create a new tuple because PIG
+   *           expects Slice.next(tuple) always returning a brand new tuple.
+   * 
+   * @param tuple
+   * @throws IOException
+   * 
+   */
+  public static void formatTuple(Tuple tuple, String projection) throws 
IOException {
+    Tuple one = createTuple(Projection.getNumColumns(projection));
+    tuple.reference(one);
+    return;
+    /*
+     * Dead code below.
+     */
+    // int n = schema.getNumColumns();
+    // if (tuple.size() == n) return;
+    // if (tuple.size() == 0) {
+    // for (int i = 0; i < schema.getNumColumns(); ++i) {
+    // tuple.append(null);
+    // }
+    // return;
+    // }
+    // throw new IOException("Tuple already formatted with " + tuple.size()
+    // + "  fields");
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/package-info.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/package-info.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/package-info.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/package-info.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * Data types being shared between the io and mapred packages.
+ */
+package org.apache.hadoop.zebra.types;

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.BasicTableStatus;
+import org.apache.hadoop.zebra.io.KeyDistribution;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBasicTable {
+  public static Configuration conf;
+  public static Random random;
+  public static Path rootPath;
+  public static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+    random = new Random(System.nanoTime());
+    rootPath = new Path(System.getProperty("test.build.data",
+        "build/test/data/work-dir"));
+    fs = rootPath.getFileSystem(conf);
+  }
+
+  @BeforeClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  BytesWritable makeRandomKey(int max) {
+    return makeKey(random.nextInt(max));
+  }
+
+  BytesWritable makeKey(int i) {
+    return new BytesWritable(String.format("key%09d", i).getBytes());
+  }
+
+  String makeString(String prefix, int max) {
+    return String.format("%s%09d", prefix, random.nextInt(max));
+  }
+
+  int createBasicTable(int parts, int rows, String strSchema, String storage,
+      Path path, boolean properClose, boolean sorted) throws IOException {
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage,
+        sorted, conf);
+    writer.finish();
+
+    int total = 0;
+    Schema schema = writer.getSchema();
+    String colNames[] = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    for (int i = 0; i < parts; ++i) {
+      writer = new BasicTable.Writer(path, conf);
+      TableInserter inserter = writer.getInserter(
+          String.format("part-%06d", i), true);
+      if (rows > 0) {
+        int actualRows = random.nextInt(rows) + rows / 2;
+        for (int j = 0; j < actualRows; ++j, ++total) {
+          BytesWritable key;
+          if (!sorted) {
+            key = makeRandomKey(rows * 10);
+          } else {
+            key = makeKey(total);
+          }
+          TypesUtils.resetTuple(tuple);
+          for (int k = 0; k < tuple.size(); ++k) {
+            try {
+              tuple.set(k, makeString("col-" + colNames[k], rows * 10));
+            } catch (ExecException e) {
+              e.printStackTrace();
+            }
+          }
+          inserter.insert(key, tuple);
+        }
+      }
+      inserter.close();
+    }
+
+    if (properClose) {
+      writer = new BasicTable.Writer(path, conf);
+      writer.close();
+      BasicTableStatus status = getStatus(path);
+      Assert.assertEquals(total, status.getRows());
+    }
+
+    return total;
+  }
+
+  void rangeSplitBasicTable(int numSplits, int totalRows, String strProjection,
+      Path path) throws IOException, ParseException {
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(numSplits);
+    reader.close();
+    int total = 0;
+    for (int i = 0; i < splits.size(); ++i) {
+      reader = new BasicTable.Reader(path, conf);
+      reader.setProjection(strProjection);
+      total += doReadOnly(reader.getScanner(splits.get(i), true));
+      totalBytes -= reader.getBlockDistribution(splits.get(i)).getLength();
+    }
+    Assert.assertEquals(total, totalRows);
+    Assert.assertEquals(0L, totalBytes);
+    // TODO: verify tuples contains the right projected values
+  }
+
+  void doRangeSplit(int[] numSplits, int totalRows, String projection, Path 
path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        rangeSplitBasicTable(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  void keySplitBasicTable(int numSplits, int totalRows, String strProjection,
+      Path path) throws IOException, ParseException {
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(strProjection);
+    long totalBytes = reader.getStatus().getSize();
+    KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
+    Assert.assertEquals(totalBytes, keyDistri.length());
+    reader.close();
+    BytesWritable[] keys = null;
+    if (keyDistri.size() >= numSplits) {
+      keyDistri.resize(numSplits);
+      Assert.assertEquals(totalBytes, keyDistri.length());
+      RawComparable[] rawComparables = keyDistri.getKeys();
+      keys = new BytesWritable[rawComparables.length];
+      for (int i = 0; i < keys.length; ++i) {
+        keys[i] = new BytesWritable();
+        keys[i].setSize(rawComparables[i].size());
+        System.arraycopy(rawComparables[i].buffer(),
+            rawComparables[i].offset(), keys[i].get(), 0, rawComparables[i]
+                .size());
+      }
+    } else {
+      int targetSize = Math.min(totalRows / 10, numSplits);
+      // revert to manually cooked up keys.
+      Set<Integer> keySets = new TreeSet<Integer>();
+      while (keySets.size() < targetSize) {
+        keySets.add(random.nextInt(totalRows));
+      }
+      keys = new BytesWritable[targetSize];
+      if (!keySets.isEmpty()) {
+        int j = 0;
+        for (int i : keySets.toArray(new Integer[keySets.size()])) {
+          keys[j] = makeKey(i);
+          ++j;
+        }
+      }
+    }
+
+    int total = 0;
+    for (int i = 0; i < keys.length; ++i) {
+      reader = new BasicTable.Reader(path, conf);
+      reader.setProjection(strProjection);
+      BytesWritable begin = (i == 0) ? null : keys[i - 1];
+      BytesWritable end = (i == keys.length - 1) ? null : keys[i];
+      total += doReadOnly(reader.getScanner(begin, end, true));
+    }
+    Assert.assertEquals(total, totalRows);
+  }
+
+  void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+      throws IOException, ParseException {
+    for (int i : numSplits) {
+      if (i > 0) {
+        keySplitBasicTable(i, totalRows, projection, path);
+      }
+    }
+  }
+
+  BasicTableStatus getStatus(Path path) throws IOException {
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    try {
+      return reader.getStatus();
+    } finally {
+      reader.close();
+    }
+  }
+
+  void doReadWrite(Path path, int parts, int rows, String schema,
+      String storage, String projection, boolean properClose, boolean sorted)
+      throws IOException, ParseException {
+    int totalRows = createBasicTable(parts, rows, schema, storage, path,
+        properClose, sorted);
+    if (rows == 0) {
+      Assert.assertEquals(rows, 0);
+    }
+
+    doRangeSplit(new int[] { 1, 2, parts / 2, parts, 2 * parts }, totalRows,
+        projection, path);
+    if (sorted) {
+      doKeySplit(new int[] { 1, 2, parts / 2, parts, 2 * parts, 10 * parts },
+          totalRows, projection, path);
+    }
+  }
+
+  public void testMultiCGs() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestBasicTableMultiCGs");
+    doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", 
"[SF_a,SF_b,SF_c];[SF_d,SF_e]", "SF_f,SF_a,SF_c,SF_d", true, false);
+  }
+
+  public void testCornerCases() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestBasicTableCornerCases");
+    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", false, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, true);
+    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", false, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
+  }
+
+  int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+    int total = 0;
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+    for (; !scanner.atEnd(); scanner.advance()) {
+      ++total;
+      switch (random.nextInt() % 4) {
+      case 0:
+        scanner.getKey(key);
+        break;
+      case 1:
+        scanner.getValue(value);
+        break;
+      case 2:
+        scanner.getKey(key);
+        scanner.getValue(value);
+        break;
+      default: // no-op.
+      }
+    }
+    scanner.close();
+
+    return total;
+  }
+
+  @Test
+  public void testNullSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestBasicTableNullSplits");
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection("a,d,c,f");
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
+    Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, null,
+        false)));
+    reader.close();
+  }
+
+  @Test
+  public void testNegativeSplits() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestNegativeSplits");
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    rangeSplitBasicTable(-1, totalRows, "a,d,c,f", path);
+  }
+
+  @Test
+  public void testMetaBlocks() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestBasicTableMetaBlocks");
+    createBasicTable(3, 100, "a, b, c", "", path, false, false);
+    BasicTable.Writer writer = new BasicTable.Writer(path, conf);
+    BytesWritable meta1 = makeKey(1234);
+    BytesWritable meta2 = makeKey(9876);
+    DataOutputStream dos = writer.createMetaBlock("testMetaBlocks.meta1");
+    try {
+      meta1.write(dos);
+    } finally {
+      dos.close();
+    }
+    dos = writer.createMetaBlock("testMetaBlocks.meta2");
+    try {
+      meta2.write(dos);
+    } finally {
+      dos.close();
+    }
+    writer.close();
+
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection("a,d,c,f");
+    BytesWritable tmp = new BytesWritable();
+    DataInputStream dis = reader.getMetaBlock("testMetaBlocks.meta1");
+    try {
+      tmp.readFields(dis);
+      Assert.assertTrue(tmp.compareTo(meta1) == 0);
+    } finally {
+      dis.close();
+    }
+
+    dis = reader.getMetaBlock("testMetaBlocks.meta2");
+    try {
+      tmp.readFields(dis);
+      Assert.assertTrue(tmp.compareTo(meta2) == 0);
+    } finally {
+      dis.close();
+    }
+    reader.close();
+  }
+
+  @Test
+  public void testNormalCases() throws IOException, ParseException {
+    Path path = new Path(rootPath, "TestBasicTableNormal");
+    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", false, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, true);
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestBasicTableMapSplits {
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), 
m:map(string), c:collection(f13:double, f14:float, f15:bytes)";
+  final static String STR_STORAGE = "[r.f12, f1, m#{b}]; [m#{a}, r.f11]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestBasicTableMapSplits");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, true);
+
+    Tuple tupRecord;
+    try {
+      tupRecord = TypesUtils.createTuple(schema.getColumnSchema("r")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+    tupRecord.set(0, 1);
+    tupRecord.set(1, 1001L);
+    tuple.set(1, tupRecord);
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "x");
+    map.put("b", "y");
+    map.put("c", "z");
+    tuple.set(2, map);
+
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(3).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  public void testDescribse() throws IOException {
+
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("r.f12, f1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Assert.assertEquals(1001L, value.get(0));
+    Assert.assertEquals(true, value.get(1));
+    reader.close();
+  }
+
+  @Test
+  public void testStitch1() throws IOException, ParseException {
+    String projection = new String("f1, m#{a|b}, r, m#{c}");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Tuple recordTuple = (Tuple) value.get(2);
+    Assert.assertEquals(1, recordTuple.get(0));
+    Assert.assertEquals(1001L, recordTuple.get(1));
+    Assert.assertEquals(true, value.get(0));
+
+    HashMap<String, Object> mapval = (HashMap<String, Object>) value.get(1);
+    Assert.assertEquals("x", mapval.get("a"));
+    Assert.assertEquals("y", mapval.get("b"));
+    Assert.assertEquals(null, mapval.get("c"));
+    mapval = (HashMap<String, Object>) value.get(3);
+    Assert.assertEquals("z", mapval.get("c"));
+    Assert.assertEquals(null, mapval.get("a"));
+    Assert.assertEquals(null, mapval.get("b"));
+    reader.close();
+  }
+
+  @Test
+  public void testStitch2() throws IOException, ParseException {
+    String projection = new String("f1, m#{a}, r, m#{c}, m");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Tuple recordTuple = (Tuple) value.get(2);
+    Assert.assertEquals(1, recordTuple.get(0));
+    Assert.assertEquals(1001L, recordTuple.get(1));
+    Assert.assertEquals(true, value.get(0));
+
+    Map<String, Object> mapval = (Map<String, Object>) value.get(1);
+    Assert.assertEquals("x", mapval.get("a"));
+    mapval = (Map<String, Object>) value.get(3);
+    Assert.assertEquals("z", mapval.get("c"));
+
+    Map<String, Object> map = (Map<String, Object>) (value.get(4));
+    Assert.assertEquals("x", (String) map.get("a"));
+    Assert.assertEquals("y", (String) map.get("b"));
+    Assert.assertEquals("z", (String) map.get("c"));
+    reader.close();
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestBasicTableProjections {
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestBasicTableProjections");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, "a,b,c,d,e,f,g",
+        "[a,b,c];[d,e,f,g]", false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    // String[] colNames = schema.getColumns();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    // BytesWritable key;
+    int parts = 2;
+    for (int part = 0; part < parts; part++) {
+      writer = new BasicTable.Writer(path, conf);
+      TableInserter inserter = writer.getInserter("part" + part, true);
+      TypesUtils.resetTuple(tuple);
+      for (int row = 0; row < 2; row++) {
+        try {
+          for (int nx = 0; nx < tuple.size(); nx++)
+            tuple.set(nx, String.format("%c%d%d", 'a' + nx, part + 1, row + 
1));
+        } catch (ExecException e) {
+          e.printStackTrace();
+        }
+        inserter.insert(new BytesWritable(String.format("k%d%d", part + 1,
+            row + 1).getBytes()), tuple);
+      }
+      inserter.close();
+      writer.finish();
+    }
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("f,a");
+    Configuration conf = new Configuration();
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+    Assert.assertEquals("f11", value.get(0));
+    Assert.assertEquals("a11", value.get(1));
+    try {
+      value.get(2);
+      Assert.fail("Failed to catch out of boundary exceptions.");
+    } catch (ExecException e) {
+      // no op, expecting out of bounds exceptions
+    }
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    TypesUtils.resetTuple(value);
+    scanner.getValue(value);
+    Assert.assertEquals("f12", value.get(0));
+    Assert.assertEquals("a12", value.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k21".getBytes()));
+    TypesUtils.resetTuple(value);
+    scanner.getValue(value);
+    Assert.assertEquals("f21", value.get(0));
+    Assert.assertEquals("a21", value.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k22".getBytes()));
+    TypesUtils.resetTuple(value);
+    scanner.getValue(value);
+    Assert.assertEquals("f22", value.get(0));
+    Assert.assertEquals("a22", value.get(1));
+  }
+
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * 
+ * Test projections on complicated column types.
+ * 
+ */
+public class TestBasicTableSplits {
+  final static String STR_SCHEMA = "f1:bool, r:record(f11:int, f12:long), 
m:map(string), c:collection(f13:double, f14:float, f15:bytes)";
+  // TODO: try map hash split later
+  final static String STR_STORAGE = "[r.f12, f1]; [m]";
+  private static Configuration conf;
+  private static Path path;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    conf = new Configuration();
+    conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+    conf.setInt("table.input.split.minSize", 64 * 1024);
+    conf.set("table.output.tfile.compression", "none");
+
+    RawLocalFileSystem rawLFS = new RawLocalFileSystem();
+    fs = new LocalFileSystem(rawLFS);
+    path = new Path(fs.getWorkingDirectory(), "TestBasicTableSplits");
+    fs = path.getFileSystem(conf);
+    // drop any previous tables
+    BasicTable.drop(path, conf);
+
+    BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
+        STR_STORAGE, false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    tuple.set(0, true);
+
+    Tuple tupRecord;
+    try {
+      tupRecord = TypesUtils.createTuple(schema.getColumnSchema("r")
+          .getSchema());
+    } catch (ParseException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+
+    // row 1
+    tupRecord.set(0, 1);
+    tupRecord.set(1, 1001L);
+    tuple.set(1, tupRecord);
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("a", "x");
+    map.put("b", "y");
+    map.put("c", "z");
+    tuple.set(2, map);
+
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(3).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(tupRecord);
+    map.clear();
+    tuple.set(0, false);
+    tupRecord.set(0, 2);
+    tupRecord.set(1, 1002L);
+    tuple.set(1, tupRecord);
+    map.put("boy", "girl");
+    map.put("adam", "amy");
+    map.put("bob", "becky");
+    map.put("carl", "cathy");
+    tuple.set(2, map);
+    bagColl.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(3, bagColl);
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+  }
+
+  @AfterClass
+  public static void tearDownOnce() throws IOException {
+  }
+
+  @Test
+  public void test1() throws IOException, ParseException {
+    String projection = new String("r.f12, f1");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Assert.assertEquals(1001L, value.get(0));
+    Assert.assertEquals(true, value.get(1));
+
+    scanner.advance();
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k12".getBytes()));
+    scanner.getValue(value);
+    Assert.assertEquals(1002L, value.get(0));
+    Assert.assertEquals(false, value.get(1));
+
+    reader.close();
+  }
+
+  @Test
+  public void testStitch() throws IOException, ParseException {
+    String projection = new String("f1, r");
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    // long totalBytes = reader.getStatus().getSize();
+
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    reader.close();
+    reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(splits.get(0), true);
+    BytesWritable key = new BytesWritable();
+    Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getKey(key);
+    Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+    scanner.getValue(value);
+
+    Tuple recordTuple = (Tuple) value.get(1);
+    Assert.assertEquals(1, recordTuple.get(0));
+    Assert.assertEquals(1001L, recordTuple.get(1));
+    Assert.assertEquals(true, value.get(0));
+    reader.close();
+  }
+}

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java?rev=803312&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
 Tue Aug 11 22:27:44 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+...@runwith(Suite.class)
+...@suite.suiteclasses({
+  TestBasicTable.class,
+  TestBasicTableMapSplits.class,
+  TestBasicTableProjections.class,
+  TestBasicTableSplits.class,
+  TestCollection.class,
+  TestColumnGroupInserters.class,
+  TestColumnGroup.class,
+  TestColumnGroupOpen.class,
+  TestColumnGroupProjections.class,
+  TestColumnGroupReaders.class,
+  TestColumnGroupSchemas.class,
+  TestColumnGroupSplits.class,
+  TestMap.class,
+  TestMapOfRecord.class,
+  TestMixedType1.class,
+  TestNegative.class,
+  TestRecord2Map.class,
+  TestRecord3Map.class,
+  TestRecord.class,
+  TestRecordMap.class,
+  TestSchema.class,
+  TestSimple.class,
+  TestWrite.class  
+})
+
+public class TestCheckin {
+  // the class remains completely empty, 
+  // being used only as a holder for the above annotations
+}


Reply via email to