Author: gates
Date: Fri Oct 16 22:15:59 2009
New Revision: 826110

URL: http://svn.apache.org/viewvc?rev=826110&view=rev
Log:
PIG-993 Ability to drop a column group in a table.


Added:
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnSecurity.java
Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    hadoop/pig/trunk/contrib/zebra/build.xml
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Oct 16 22:15:59 2009
@@ -5,6 +5,8 @@
   INCOMPATIBLE CHANGES
 
   IMPROVEMENTS
+       
+       PIG-993 Ability to drop a column group in a table (yanz and rangadi via 
gates)
 
     PIG-992 Separate schema related files into a schema package (yanz via
        gates)

Modified: hadoop/pig/trunk/contrib/zebra/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build.xml?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build.xml Fri Oct 16 22:15:59 2009
@@ -73,7 +73,7 @@
   <target name="javadoc" depends="jar">
          <mkdir dir="${build.javadoc}" />
          <javadoc packagenames="org.apache.pig.*" 
overview="${src.docs.dir}/overview.html" destdir="${build.javadoc}" 
author="true" version="true" use="true" windowtitle="Hadoop Zebra API" 
doctitle="Hadoop Zebra API" bottom="Copyright &amp;copy; ${year} The Apache 
Software Foundation">
-                 <packageset dir="${src.dir}" 
excludes="**/examples,**/comparator/,**/io/" />
+                 <packageset dir="${src.dir}" excludes="**/examples" />
                  <link href="${javadoc.link}" />
           <classpath refid="classpath"/>
                  <!--
@@ -126,10 +126,16 @@
       <classpath refid="test.classpath"/>
       <formatter type="${test.junit.output.format}" />
 
-      <batchtest todir="${build.test}">
+      <batchtest todir="${build.test}"  unless="testcase">
         <fileset dir="${src.test}"
                  includes="**/TestCheckin*.java" 
excludes="**/${test.exclude}.java" />
       </batchtest>
+      <batchtest todir="${build.test}"  if="testcase">
+        <fileset dir="${src.test}"
+                 includes="**/${testcase}.java"/>
+      </batchtest>
+
+
     </junit>
     <fail if="tests.failed">Tests failed!</fail>
   </target>

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Fri Oct 16 22:15:59 2009
@@ -22,15 +22,21 @@
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -68,6 +74,9 @@
  * </ul>
  */
 public class BasicTable {
+  
+  static Log LOG = LogFactory.getLog(BasicTable.class);
+  
   // name of the BasicTable schema file
   private final static String BT_SCHEMA_FILE = ".btschema";
   // schema version
@@ -80,12 +89,114 @@
   // default comparator to "memcmp"
   private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
 
+  private final static String DELETED_CG_PREFIX = ".deleted-";
+  
   // no public ctor for instantiating a BasicTable object
   private BasicTable() {
     // no-op
   }
 
   /**
+   * Deletes the data for column group specified by cgName.
+   * When the readers try to read the fields that were stored in the
+   * column group get null since the underlying data is removed.
+   * <br> <br>
+   * 
+   * Effect on the readers that are currently reading from the table while
+   * a column group is droped is unspecified. Suggested practice is to 
+   * drop column groups when there are no readers or writes for the table.
+   * <br> <br>
+   * 
+   * Column group names are usually specified in the "storage hint" while
+   * creating a table. If no name is specified, system assigns a simple name.
+   * These names could be obtained through "dumpInfo()" and other methods.
+   * <br> <br> 
+   *
+   * Dropping a column group that has already been removed is a no-op no 
+   * exception is thrown.
+   * 
+   * @param path path to BasicTable
+   * @param conf Configuration determines file system and other parameters.
+   * @param cgName name of the column group to drop.
+   * @throws IOException IOException could occur for various reasons. E.g.
+   *         a user does not have permissions to write to table directory.
+   *         
+   */
+  public static void dropColumnGroup(Path path, Configuration conf,
+                                     String cgName) 
+                                     throws IOException {
+    
+    FileSystem fs = FileSystem.get(conf);
+    
+    SchemaFile schemaFile = new SchemaFile(path, conf);
+    
+    int cgIdx = schemaFile.getCGByName(cgName);
+    if (cgIdx < 0) {
+      throw new IOException(path + 
+             " : Could not find a column group with the name '" + cgName + 
"'");
+    }
+    
+    Path cgPath = new Path(path, schemaFile.getName(cgIdx));
+    
+    //Clean up any previous unfinished attempts to drop column groups?
+    
+    if (schemaFile.isCGDeleted(cgIdx)) {
+      // Clean up unfinished delete if it exists. so that clean up can 
+      // complete if the previous deletion was interrupted for some reason.
+      if (fs.exists(cgPath)) {
+        LOG.info(path + " : " + 
+                 " clearing unfinished deletion of column group " +
+                 cgName + ".");
+        fs.delete(cgPath, true);
+      }
+      LOG.info(path + " : column group " + cgName + " is already deleted.");
+      return;
+    }
+    
+    // try to delete the column group:
+    
+    // first check if the user has enough permissions to list the directory
+    fs.listStatus(cgPath);   
+    
+    //verify if the user has enough permissions by trying to create
+    //a temporary file in cg.
+    OutputStream out = fs.create(
+              new Path(cgPath, ".tmp" + DELETED_CG_PREFIX + cgName), true);
+    out.close();
+    
+    //First try to create a file indicating a column group is deleted.
+    try {
+      Path deletedCGPath = new Path(path, DELETED_CG_PREFIX + cgName);
+      // create without overriding.
+      out = fs.create(deletedCGPath, false);
+      // should we write anything?
+      out.close();
+    } catch (IOException e) {
+      // one remote possibility is that another user 
+      // already deleted CG. 
+      SchemaFile tempSchema = new SchemaFile(path, conf);
+      if (tempSchema.isCGDeleted(cgIdx)) {
+        LOG.info(path + " : " + cgName + 
+                 " is deleted by someone else. That is ok.");
+        return;
+      }
+      // otherwise, it is some other error.
+      throw e;
+    }
+    
+    // At this stage, the CG is marked deleted. Now just try to
+    // delete the actual directory:
+    if (!fs.delete(cgPath, true)) {
+      String msg = path + " : Could not detete column group " +
+                   cgName + ". It is marked deleted.";
+      LOG.warn(msg);
+      throw new IOException(msg);
+    }
+    
+    LOG.info("Dropped " + cgName + " from " + path);
+  }
+  
+  /**
    * BasicTable reader.
    */
   public static class Reader implements Closeable {
@@ -96,6 +207,7 @@
     boolean inferredMapping;
     private MetaFile.Reader metaReader;
     private BasicTableStatus status;
+    private int firstValidCG = -1; /// First column group that exists.
     Partition partition;
     ColumnGroup.Reader[] colGroups;
     Tuple[] cgTuples;
@@ -103,9 +215,20 @@
     private synchronized void checkInferredMapping() throws ParseException, 
IOException {
       if (!inferredMapping) {
         for (int i = 0; i < colGroups.length; ++i) {
-          colGroups[i].setProjection(partition.getProjection(i));
-          if (partition.isCGNeeded(i))
-            cgTuples[i] = TypesUtils.createTuple(colGroups[i].getSchema());
+          if (colGroups[i] != null) {
+            colGroups[i].setProjection(partition.getProjection(i));
+          } 
+          if (partition.isCGNeeded(i)) {
+            if (isCGDeleted(i)) {
+              // this is a deleted column group. Warn about it.
+              LOG.warn("Trying to read from deleted column group " + 
+                       schemaFile.getName(i) + 
+                       ". NULL is returned for corresponding columns. " +
+                       "Table at " + path);
+            } else {
+              cgTuples[i] = TypesUtils.createTuple(colGroups[i].getSchema());
+            }
+          }
           else
             cgTuples[i] = null;
         }
@@ -119,6 +242,13 @@
     }
 
     /**
+     * Returns true if a column group is deleted.
+     */
+    private boolean isCGDeleted(int nx) {
+      return colGroups[nx] == null;
+    }
+    
+    /**
      * Create a BasicTable reader.
      * 
      * @param path
@@ -143,15 +273,19 @@
         String storage = schemaFile.getStorageString();
         partition = new Partition(schema, projection, storage);
         for (int nx = 0; nx < numCGs; nx++) {
-          colGroups[nx] =
-            new ColumnGroup.Reader(new Path(path, 
partition.getCGSchema(nx).getName()),
-                  conf);
-          if (partition.isCGNeeded(nx))
+          if (!schemaFile.isCGDeleted(nx)) {
+            colGroups[nx] =
+              new ColumnGroup.Reader(new Path(path, 
partition.getCGSchema(nx).getName()),
+                                     conf);
+            if (firstValidCG < 0) {
+              firstValidCG = nx;
+            }
+          }
+          if (colGroups[nx] != null && partition.isCGNeeded(nx))
             cgTuples[nx] = TypesUtils.createTuple(colGroups[nx].getSchema());
           else
             cgTuples[nx] = null;
         }
-        partition.setSource(cgTuples);
         buildStatus();
         closed = false;
       }
@@ -254,8 +388,10 @@
         throws IOException {
       BlockDistribution bd = new BlockDistribution();
       for (int nx = 0; nx < colGroups.length; nx++) {
-        bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split
+        if (!isCGDeleted(nx)) {
+          bd.add(colGroups[nx].getBlockDistribution(split == null ? null : 
split
             .get(nx)));
+        }
       }
       return bd;
     }
@@ -275,7 +411,9 @@
       KeyDistribution kd =
           new 
KeyDistribution(TFile.makeComparator(schemaFile.getComparator()));
       for (int nx = 0; nx < colGroups.length; nx++) {
-        kd.add(colGroups[nx].getKeyDistribution(n));
+        if (!isCGDeleted(nx)) {
+           kd.add(colGroups[nx].getKeyDistribution(n));
+        }
       }
       if (kd.size() > (int) (n * 1.5)) {
         kd.resize(n);
@@ -384,12 +522,16 @@
       List<CGRangeSplit>[] cgSplitsAll = new ArrayList[colGroups.length];
       // split each CG
       for (int nx = 0; nx < colGroups.length; nx++) {
-        cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
+        if (!isCGDeleted(nx))
+          cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
       }
 
       // verify all CGs have same number of slices
       int numSlices = -1;
       for (int nx = 0; nx < cgSplitsAll.length; nx++) {
+        if (isCGDeleted(nx)) {
+          continue;
+        }
         if (numSlices < 0) {
           numSlices = cgSplitsAll[nx].size();
         }
@@ -398,12 +540,22 @@
               "BasicTable's column groups were not equally split.");
         }
       }
+      if (numSlices <= 0) {
+        // This could happen because of various reasons.
+        // One possibility is that all the CGs are deleted.
+        numSlices = 1;
+      }
       // return horizontal slices as RangeSplits
       List<RangeSplit> ret = new ArrayList<RangeSplit>(numSlices);
       for (int slice = 0; slice < numSlices; slice++) {
         CGRangeSplit[] oneSliceSplits = new CGRangeSplit[cgSplitsAll.length];
         for (int cgIndex = 0; cgIndex < cgSplitsAll.length; cgIndex++) {
-          oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
+          if (isCGDeleted(cgIndex)) {
+            // set a dummy split
+            oneSliceSplits[cgIndex] = new CGRangeSplit(0, 0);
+          } else {
+            oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
+          }
         }
         ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplits));
       }
@@ -420,7 +572,9 @@
           closed = true;
           metaReader.close();
           for (int i = 0; i < colGroups.length; ++i) {
-            colGroups[i].close();
+            if (colGroups[i] != null) {
+              colGroups[i].close();
+            }
           }
         }
         finally {
@@ -452,12 +606,20 @@
 
     private void buildStatus() {
       status = new BasicTableStatus();
-      status.beginKey = colGroups[0].getStatus().getBeginKey();
-      status.endKey = colGroups[0].getStatus().getEndKey();
-      status.rows = colGroups[0].getStatus().getRows();
+      if (firstValidCG >= 0) {
+        status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
+        status.endKey = colGroups[firstValidCG].getStatus().getEndKey();
+        status.rows = colGroups[firstValidCG].getStatus().getRows();
+      } else {
+        status.beginKey = new BytesWritable(new byte[0]);
+        status.endKey = status.beginKey;
+        status.rows = 0;
+      }
       status.size = 0;
       for (int nx = 0; nx < colGroups.length; nx++) {
-        status.size += colGroups[nx].getStatus().getSize();
+        if (colGroups[nx] != null) {
+          status.size += colGroups[nx].getStatus().getSize();
+        }
       }
     }
 
@@ -549,14 +711,18 @@
           schema = partition.getProjection();
           cgScanners = new TableScanner[colGroups.length];
           for (int i = 0; i < colGroups.length; ++i) {
-            // if no CG is needed explicitly by projection but the "countRow" 
still needs to access some column group
-            if (partition.isCGNeeded(i) || (!anyScanner && (i == 
colGroups.length-1)))
+            if (!isCGDeleted(i) && partition.isCGNeeded(i)) 
             {
               anyScanner = true;
               cgScanners[i] = colGroups[i].getScanner(beginKey, endKey, false);
             } else
               cgScanners[i] = null;
           }
+          if (!anyScanner && firstValidCG >= 0) {
+            // if no CG is needed explicitly by projection but the "countRow" 
still needs to access some column group
+            cgScanners[firstValidCG] = colGroups[firstValidCG].
+                                         getScanner(beginKey, endKey, false);
+          }
           this.closeReader = closeReader;
           sClosed = false;
         }
@@ -591,7 +757,7 @@
           boolean anyScanner = false;
           for (int i = 0; i < colGroups.length; ++i) {
             // if no CG is needed explicitly by projection but the "countRow" 
still needs to access some column group
-            if (partition.isCGNeeded(i) || (!anyScanner && (i == 
colGroups.length-1)))
+            if (!isCGDeleted(i) && partition.isCGNeeded(i))
             {
               cgScanners[i] =
                   colGroups[i].getScanner(split == null ? null : split.get(i),
@@ -600,6 +766,11 @@
             } else
               cgScanners[i] = null;
           }
+          if (!anyScanner && firstValidCG >= 0) {
+            // if no CG is needed explicitly by projection but the "countRow" 
still needs to access some column group
+            cgScanners[firstValidCG] = colGroups[firstValidCG].
+              getScanner(split == null ? null : split.get(firstValidCG), 
false);
+          }
           this.partition = partition;
           this.closeReader = closeReader;
           sClosed = false;
@@ -1232,7 +1403,12 @@
     boolean sorted;
     String storage;
     CGSchema[] cgschemas;
-
+    
+    // Array indicating if a physical schema is already dropped
+    // It is probably better to create "CGProperties" class and
+    // store multiple properties like name there.
+    boolean[] cgDeletedFlags;
+   
     // ctor for reading
     public SchemaFile(Path path, Configuration conf) throws IOException {
       readSchemaFile(path, conf);
@@ -1260,6 +1436,7 @@
       for (int nx = 0; nx < cgschemas.length; nx++) {
         physical[nx] = cgschemas[nx].getSchema();
       }
+      cgDeletedFlags = new boolean[physical.length];
       this.sorted = sorted;
       version = SCHEMA_VERSION;
 
@@ -1303,6 +1480,24 @@
       return cgschemas[nx].getCompressor();
     }
 
+    /** 
+     * Returns the index for CG with the given name.
+     * -1 indicates that there is no CG with the name.
+     */
+    int getCGByName(String cgName) {
+      for(int i=0; i<physical.length; i++) {
+        if (cgName.equals(getName(i))) {
+          return i;
+        }
+      }
+      return -1;
+    }
+    
+    /** Returns if the CG at the given index is delete */
+    boolean isCGDeleted(int idx) {
+      return cgDeletedFlags[idx];
+    }
+    
     public String getOwner(int nx) {
         return cgschemas[nx].getOwner();
       }
@@ -1315,7 +1510,6 @@
         return cgschemas[nx].getPerm();
     }
     
-    
     /**
      * @return the string representation of the physical schema.
      */
@@ -1377,8 +1571,10 @@
       catch (Exception e) {
         throw new IOException("Partition constructor failed :" + 
e.getMessage());
       }
+      cgschemas = partition.getCGSchemas();
       int numCGs = WritableUtils.readVInt(in);
       physical = new Schema[numCGs];
+      cgDeletedFlags = new boolean[physical.length];
       TableSchemaParser parser;
       String cgschemastr;
       try {
@@ -1392,12 +1588,37 @@
         throw new IOException("parser.RecordSchema failed :" + e.getMessage());
       }
       sorted = WritableUtils.readVInt(in) == 1 ? true : false;
+      setCGDeletedFlags(path, conf);
       in.close();
     }
 
     private static Path makeSchemaFilePath(Path parent) {
       return new Path(parent, BT_SCHEMA_FILE);
     }
+    
+    /**
+     * Sets cgDeletedFlags array by checking presense of
+     * ".deleted-CGNAME" directory in the table top level
+     * directory. 
+     */
+    void setCGDeletedFlags(Path path, Configuration conf) throws IOException {
+      
+      Set<String> deletedCGs = new HashSet<String>(); 
+      
+      for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
+        if (!file.isDir()) {
+           String fname =  file.getPath().getName();
+           if (fname.startsWith(DELETED_CG_PREFIX)) {
+             deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+           }
+        }
+      }
+      
+      for(int i=0; i<physical.length; i++) {
+        cgDeletedFlags[i] = 
+          deletedCGs.contains(getName(i));
+      }
+    }
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf)
@@ -1421,7 +1642,15 @@
       for (int nx = 0; nx < reader.colGroups.length; nx++) {
         IOutils.indent(out, indent);
         out.printf("\nColumn Group [%d] :", nx);
-        ColumnGroup.dumpInfo(reader.colGroups[nx].path, out, conf, indent);
+        if (reader.colGroups[nx] != null) {
+          ColumnGroup.dumpInfo(reader.colGroups[nx].path, out, conf, indent);
+        } else {
+          // print basic info for deleted column groups.
+          out.printf("\nColum Group : DELETED");
+          out.printf("\nName : %s", reader.schemaFile.getName(nx));
+          out.printf("\nSchema : %s\n", 
+                     reader.schemaFile.cgschemas[nx].getSchema().toString());
+        }
       }
     }
     catch (Exception e) {

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
 Fri Oct 16 22:15:59 2009
@@ -426,6 +426,12 @@
       src.setProjIndex(mSize++);
     }
 
+    void cleanup()
+    {
+      for (int i = 0;  i < mSources.size(); i++)
+        mSources.get(i).cleanup();
+    }
+
     void insert(final BytesWritable key) throws ExecException {
       for (int i = 0; i < mSize; i++)
         ((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
@@ -488,6 +494,7 @@
     private Object mTuple = null;
     private boolean mNeedTmpTuple;
     private HashSet<String> mKeys; // interested hash keys
+    private PartitionedColumn parent = null;
 
     PartitionedColumn(int fi, boolean needTmpTuple)
         throws IOException {
@@ -505,6 +512,10 @@
       mKeys = keys;
     }
 
+    private void setParent(PartitionedColumn parent) {
+      this.parent = parent;
+    }
+
     /**
      * stitch op
      */
@@ -580,6 +591,28 @@
       if (mChildren == null) mChildren = new ArrayList<PartitionedColumn>();
       mChildren.add(child);
       mChildrenLen++;
+      child.setParent(this);
+    }
+
+    void cleanup() {
+      if (parent != null) {
+        parent.removeChild(this);
+      }
+      if (mNeedTmpTuple && mTuple != null)
+        mTuple = null;
+    }
+
+    void removeChild(PartitionedColumn child)
+    {
+      for (int i = 0; i < mChildrenLen; i++)
+      {
+        if (mChildren.get(i) == child)
+        {
+          mChildren.remove(i);
+          mChildrenLen--;
+          i--;
+        }
+      }
     }
 
     void setProjIndex(int projindex) {
@@ -1025,8 +1058,7 @@
       cgentry = getCGEntry(getCGIndex(child).getCGIndex());
 
       PartitionedColumn mapParCol =
-          new PartitionedColumn(i, Partition.SplitType.MAP, true);
-      mPCNeedTmpTuple.add(mapParCol);
+          new PartitionedColumn(i, Partition.SplitType.MAP, false);
       cgentry.addUser(mapParCol, getCGName(child));
       mExecs.add(mapParCol); // not a leaf : MAP stitch needed
       mStitchSize++;
@@ -1047,12 +1079,11 @@
         if (!projectedCGs.contains(index))
         {
           PartitionedColumn parCol =
-             new PartitionedColumn(0, true);
-          mPCNeedTmpTuple.add(parCol);
+             new PartitionedColumn(0, false);
+          // mPCNeedTmpTuple.add(parCol);
           cgentry.addUser(parCol, getCGName(child), cgindex.getKeys());
           mapParCol.addChild(parCol); // contribute to the non-key-partitioned
          // hashes
-          mPCNeedMap.add(parCol);
          projectedCGs.add(index);
         }
       }
@@ -1079,19 +1110,17 @@
         cgentry = getCGEntry(mapentry.getKey().getCGIndex());
         if (needParent)
         {
-          parCol = new PartitionedColumn(i, Partition.SplitType.MAP, true);
+          parCol = new PartitionedColumn(i, Partition.SplitType.MAP, false);
           mExecs.add(parCol); // not a leaf : MAP stitch needed
           mStitchSize++;
-          mPCNeedMap.add(parCol);
           parent.addChild(parCol);
           parent = parCol;
           needParent = false;
           newParent = true;
         } else {
-          parCol = new PartitionedColumn(newParent ? 0 : i, true);
+          parCol = new PartitionedColumn(newParent ? 0 : i, false);
           parent.addChild(parCol);
         }
-        mPCNeedTmpTuple.add(parCol);
         cgentry.addUser(parCol, getCGName(child), projectedKeys);
       }
     }
@@ -1250,7 +1279,14 @@
       throw new ParseException(
           "Internal Logical Error: Invalid number of column groups");
     for (int i = 0; i < tuples.length; i++) {
-      if (mCGs.get(i) != null) mCGs.get(i).setSource(tuples[i]);
+      if (mCGs.get(i) != null) {
+        if (tuples[i] == null) {
+          mCGs.get(i).cleanup();
+          mCGs.remove(i);
+        } else {
+          mCGs.get(i).setSource(tuples[i]);
+        }
+      }
     }
   }
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
 Fri Oct 16 22:15:59 2009
@@ -69,19 +69,19 @@
   public static void tearDownOnce() throws IOException {
   }
 
-  BytesWritable makeRandomKey(int max) {
+  static BytesWritable makeRandomKey(int max) {
     return makeKey(random.nextInt(max));
   }
 
-  BytesWritable makeKey(int i) {
+  static BytesWritable makeKey(int i) {
     return new BytesWritable(String.format("key%09d", i).getBytes());
   }
 
-  String makeString(String prefix, int max) {
+  static String makeString(String prefix, int max) {
     return String.format("%s%09d", prefix, random.nextInt(max));
   }
 
-  int createBasicTable(int parts, int rows, String strSchema, String storage,
+  static int createBasicTable(int parts, int rows, String strSchema, String 
storage,
       Path path, boolean properClose, boolean sorted) throws IOException {
     if (fs.exists(path)) {
       BasicTable.drop(path, conf);
@@ -133,7 +133,7 @@
     return total;
   }
 
-  void rangeSplitBasicTable(int numSplits, int totalRows, String strProjection,
+  static void rangeSplitBasicTable(int numSplits, int totalRows, String 
strProjection,
       Path path) throws IOException, ParseException {
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     reader.setProjection(strProjection);
@@ -153,7 +153,7 @@
     // TODO: verify tuples contains the right projected values
   }
 
-  void doRangeSplit(int[] numSplits, int totalRows, String projection, Path 
path)
+  static void doRangeSplit(int[] numSplits, int totalRows, String projection, 
Path path)
       throws IOException, ParseException {
     for (int i : numSplits) {
       if (i > 0) {
@@ -162,7 +162,7 @@
     }
   }
 
-  void keySplitBasicTable(int numSplits, int totalRows, String strProjection,
+  static void keySplitBasicTable(int numSplits, int totalRows, String 
strProjection,
       Path path) throws IOException, ParseException {
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     reader.setProjection(strProjection);
@@ -211,7 +211,7 @@
     Assert.assertEquals(total, totalRows);
   }
 
-  void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+  static void doKeySplit(int[] numSplits, int totalRows, String projection, 
Path path)
       throws IOException, ParseException {
     for (int i : numSplits) {
       if (i > 0) {
@@ -220,7 +220,7 @@
     }
   }
 
-  BasicTableStatus getStatus(Path path) throws IOException {
+  static BasicTableStatus getStatus(Path path) throws IOException {
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     try {
       return reader.getStatus();
@@ -229,7 +229,7 @@
     }
   }
 
-  void doReadWrite(Path path, int parts, int rows, String schema,
+  static void doReadWrite(Path path, int parts, int rows, String schema,
       String storage, String projection, boolean properClose, boolean sorted)
       throws IOException, ParseException {
     int totalRows = createBasicTable(parts, rows, schema, storage, path,
@@ -261,7 +261,7 @@
     doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
   }
 
-  int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+  static int doReadOnly(TableScanner scanner) throws IOException, 
ParseException {
     int total = 0;
     BytesWritable key = new BytesWritable();
     Tuple value = TypesUtils.createTuple(scanner.getSchema());

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
 Fri Oct 16 22:15:59 2009
@@ -36,6 +36,7 @@
   TestColumnGroupReaders.class,
   TestColumnGroupSchemas.class,
   TestColumnGroupSplits.class,
+  TestDropColumnGroup.class,
   TestMap.class,
   TestMapOfRecord.class,
   TestMixedType1.class,

Added: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=826110&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
 (added)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
 Fri Oct 16 22:15:59 2009
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import java.util.List;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.parser.ParseException;
+
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import org.junit.AfterClass;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDropColumnGroup {
+  Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
+  private static Path path;
+  private static Configuration conf;
+
+  @BeforeClass
+  public static void setUpOnce() throws IOException {
+    TestBasicTable.setUpOnce();
+    path = new Path(TestBasicTable.rootPath, "DropCGTest");
+    conf = TestBasicTable.conf;
+    Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    BasicTable.drop(path, conf);
+  }
+
+  /**
+   * Utitility function to open a table with a given projection and verify that
+   * certain fields in the returned tuple are null and certain fields are not.
+   */
+  void verifyScanner(Path path, Configuration conf, String projection,
+      boolean isNullExpected[], int numRowsToRead) throws IOException,
+      ParseException {
+
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(null, true);
+
+    Tuple row = TypesUtils.createTuple(reader.getSchema());
+
+    for (int i = 0; i < numRowsToRead; i++) {
+      scanner.getValue(row);
+      for (int f = 0; f < isNullExpected.length; f++) {
+        if (isNullExpected[f] ^ row.get(f) == null) {
+          throw new IOException("Verification failure at field " + f + " row "
+              + i + " : expected " + (isNullExpected[f] ? "NULL" : "nonNULL")
+              + " but got opposite.");
+
+        }
+      }
+      scanner.advance();
+    }
+
+    scanner.close();
+  }
+
+  int countRows(Path path, Configuration conf, String projection)
+      throws IOException, ParseException {
+    BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+    reader.setProjection(projection);
+    TableScanner scanner = reader.getScanner(null, true);
+    int count = 0;
+    while (!scanner.atEnd()) {
+      count++;
+      scanner.advance();
+    }
+    scanner.close();
+    return count;
+  }
+
+  @Test
+  public void testDropColumnGroup() throws IOException, ParseException {
+    /*
+     * Tests basic drop columns feature. Also tests that fields in dropped
+     * columns can be read the value returned is null.
+     */
+
+    BasicTable.drop(path, conf);
+
+    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a, b]; [c, d]", path, true, false);
+
+    int rowsToRead = Math.min(10, numRows);
+
+    // normal table.
+    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+        rowsToRead);
+
+    // Now delete ([c, d)
+    BasicTable.dropColumnGroup(path, conf, "CG1");
+
+    // check various read cases.
+    verifyScanner(path, conf, "c, a", new boolean[] { true, false }, 
rowsToRead);
+    verifyScanner(path, conf, "c, a", new boolean[] { true, false }, 
rowsToRead);
+
+    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, false,
+        false, false, true, false }, rowsToRead);
+
+    BasicTable.dumpInfo(path.toString(), System.err, conf);
+
+    // Drop CG0 ([a, b])
+    BasicTable.dropColumnGroup(path, conf, "CG0");
+
+    verifyScanner(path, conf, "a, b", new boolean[] { true, true }, 
rowsToRead);
+
+    // Drop remaining CG2
+    BasicTable.dropColumnGroup(path, conf, "CG2");
+
+    verifyScanner(path, conf, "a, b, c, d, e, f", new boolean[] { true, true,
+        true, true, true, true }, rowsToRead);
+
+    // Now make sure the reader reports zero rows.
+    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+    // delete the table
+    BasicTable.drop(path, conf);
+
+    /*
+     * Try similar tests with range splits.
+     */
+
+    // 5 splits and 50 rows
+    numRows = TestBasicTable.createBasicTable(5, 50, "a, b, c, d, e, f",
+        "[a, b]; [c, d]; [e] as myCG", path, true, false);
+
+    BasicTable.dropColumnGroup(path, conf, "myCG");
+
+    verifyScanner(path, conf, "e, c, g, b", new boolean[] { true, false, true,
+        false }, numRows);
+
+    TestBasicTable.doRangeSplit(new int[] { 4, 0, 2 }, numRows,
+        "a, b, e, f, x", path);
+
+    // Remove another CG.
+    BasicTable.dropColumnGroup(path, conf, "CG0");
+
+    TestBasicTable.doRangeSplit(new int[] { 4, 0, 2, 3, 1 }, numRows,
+        "a, y, e, f, x", path);
+
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void testDropColumnGroupsMixedTypes() throws IOException, 
ParseException {
+
+    String mixedSchema = /* roughly borrowed from testMixedType1.java */
+    "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, "
+        + "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)),"
+        + "m1:map(string),m2:map(map(int)), "
+        + "c:collection(f13:double, f14:float, f15:bytes)";
+    // [s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}];
+    // [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
+    String mixedStorageHint = "[s1, s2]                      as simpleCG; "
+        + "[m1#{a}, s3]                  as mapCG; "
+        + "[s4, r2.r3.f3, r1.f1]         as recordCG; "
+        + "[c]                           as collectionCG; "  
+        + "[r1.f2, m1#{b}, m2#{z}]       as mapRecordCG; ";
+
+    Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
+    Configuration conf = TestBasicTable.conf;
+    conf.set("fs.default.name", "file:///");
+
+    BasicTable.drop(path, conf);
+
+    // first write the table :
+    BasicTable.Writer writer = new BasicTable.Writer(path, mixedSchema,
+        mixedStorageHint, false, conf);
+    writer.finish();
+
+    Schema schema = writer.getSchema();
+    Tuple tuple = TypesUtils.createTuple(schema);
+    BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+    int part = 0;
+    TableInserter inserter = writer1.getInserter("part" + part, true);
+    TypesUtils.resetTuple(tuple);
+
+    Tuple tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+        .getSchema());
+    Tuple tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+        .getSchema());
+
+    Tuple tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+
+    // row 1
+    tuple.set(0, true); // bool
+    tuple.set(1, 1); // int
+    tuple.set(2, 1001L); // long
+    tuple.set(3, 1.1); // float
+    tuple.set(4, "hello world 1"); // string
+    tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 1);
+    tupRecord1.set(1, 1001L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 1.3);
+    tupRecord3.set(1, new DataByteArray("r3 row 1 byte array "));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    Map<String, String> m1 = new HashMap<String, String>();
+    m1.put("a", "A");
+    m1.put("b", "B");
+    m1.put("c", "C");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    HashMap<String, Map<String, Integer>> m2 = new HashMap<String, Map<String, 
Integer>>();
+    Map<String, Integer> m3 = new HashMap<String, Integer>();
+    m3.put("m311", 311);
+    m3.put("m321", 321);
+    m3.put("m331", 331);
+    Map<String, Integer> m4 = new HashMap<String, Integer>();
+    m4.put("m411", 411);
+    m4.put("m421", 421);
+    m4.put("m431", 431);
+    m2.put("x", m3);
+    m2.put("y", m4);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    DataBag bagColl = TypesUtils.createBag();
+    Schema schColl = schema.getColumn(10).getSchema();
+    Tuple tupColl1 = TypesUtils.createTuple(schColl);
+    Tuple tupColl2 = TypesUtils.createTuple(schColl);
+    byte[] abs1 = new byte[3];
+    byte[] abs2 = new byte[4];
+    tupColl1.set(0, 3.1415926);
+    tupColl1.set(1, 1.6);
+    abs1[0] = 11;
+    abs1[1] = 12;
+    abs1[2] = 13;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 123.456789);
+    tupColl2.set(1, 100);
+    abs2[0] = 21;
+    abs2[1] = 22;
+    abs2[2] = 23;
+    abs2[3] = 24;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    int row = 0;
+    inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+        .getBytes()), tuple);
+    // row 2
+    row++;
+    TypesUtils.resetTuple(tuple);
+    TypesUtils.resetTuple(tupRecord1);
+    TypesUtils.resetTuple(tupRecord2);
+    TypesUtils.resetTuple(tupRecord3);
+    m1.clear();
+    m2.clear();
+    m3.clear();
+    m4.clear();
+    tuple.set(0, false);
+    tuple.set(1, 2); // int
+    tuple.set(2, 1002L); // long
+    tuple.set(3, 3.1); // float
+    tuple.set(4, "hello world 2"); // string
+    tuple.set(5, new DataByteArray("hello byte 2")); // byte
+
+    // r1:record(f1:int, f2:long
+    tupRecord1.set(0, 2);
+
+    tupRecord1.set(1, 1002L);
+    tuple.set(6, tupRecord1);
+
+    // r2:record(r3:record(f3:float, f4))
+    tupRecord2.set(0, tupRecord3);
+    tupRecord3.set(0, 2.3);
+    tupRecord3.set(1, new DataByteArray("r3 row2  byte array"));
+    tuple.set(7, tupRecord2);
+
+    // m1:map(string)
+    m1.put("a2", "A2");
+    m1.put("b2", "B2");
+    m1.put("c2", "C2");
+    tuple.set(8, m1);
+
+    // m2:map(map(int))
+    m3.put("m321", 321);
+    m3.put("m322", 322);
+    m3.put("m323", 323);
+    m2.put("z", m3);
+    tuple.set(9, m2);
+
+    // c:collection(f13:double, f14:float, f15:bytes)
+    bagColl.clear();
+    TypesUtils.resetTuple(tupColl1);
+    TypesUtils.resetTuple(tupColl2);
+    tupColl1.set(0, 7654.321);
+    tupColl1.set(1, 0.0001);
+    abs1[0] = 31;
+    abs1[1] = 32;
+    abs1[2] = 33;
+    tupColl1.set(2, new DataByteArray(abs1));
+    bagColl.add(tupColl1);
+    tupColl2.set(0, 0.123456789);
+    tupColl2.set(1, 0.3333);
+    abs2[0] = 41;
+    abs2[1] = 42;
+    abs2[2] = 43;
+    abs2[3] = 44;
+    tupColl2.set(2, new DataByteArray(abs2));
+    bagColl.add(tupColl2);
+    tuple.set(10, bagColl);
+
+    // Write same row 10 times:
+    for (int i = 0; i < 10; i++) {
+      inserter.insert(new BytesWritable(String.format("k%d%d", part + 1 + i,
+          row + 1 + i).getBytes()), tuple);
+    }
+
+    inserter.close();
+    writer1.finish();
+
+    writer.close();
+
+    int numRows = 11;
+    // drop mapCG: removes [m1#{a}, s3]
+    BasicTable.dropColumnGroup(path, conf, "mapCG");
+
+    verifyScanner(path, conf, "m1", new boolean[] { false }, numRows);
+
+    verifyScanner(path, conf, "s1, m1#{a}, m1#{b}, s3, s4", new boolean[] {
+        false, true, false, true, false }, numRows);
+
+    // drop simpleCG : removes [s1, s2]
+    BasicTable.dropColumnGroup(path, conf, "simpleCG");
+    verifyScanner(path, conf, "s1, m1#{a}, s2, m1#{b}", new boolean[] { true,
+        true, true, false }, numRows);
+
+    // drop mapRecordCG : removes [r1.f2, m1#{b}, m2#{z}];\
+    BasicTable.dropColumnGroup(path, conf, "mapRecordCG");
+    verifyScanner(path, conf, "r1.f1, r1.f2, m1#{a}, s5, m1#{b}",
+        new boolean[] { false, true, true, false, true }, numRows);
+
+    // drop collectionCG : removes c;\
+    BasicTable.dropColumnGroup(path, conf, "collectionCG");
+    verifyScanner(path, conf, "c.f1, c.f2, c.f3", new boolean[] { true, true,
+        true }, numRows);
+
+    // clean up the table
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void test2() throws IOException, ParseException {
+    /*
+     * Tests concurrent drop CGs
+     */
+
+    BasicTable.drop(path, conf);
+
+    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path, true, false);
+    System.out.println("Frist dump:");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+    int rowsToRead = Math.min(10, numRows);
+
+    // normal table.
+    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+        rowsToRead);
+
+    // create a thread for each dropCG
+    DropThread[] threads = new DropThread[6];
+
+    for (int i = 0; i < threads.length; i++) {
+
+      threads[i] = new DropThread(i);
+    }
+
+    // start the threads
+    for (int j = 0; j < threads.length; j++) {
+      threads[j].start();
+    }
+
+    for (Thread thr : threads) {
+      try {
+        thr.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    // check various read cases.
+
+    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+        true, true, true, true }, rowsToRead);
+    System.out.println("second dump");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+    // Now make sure the reader reports zero rows.
+    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+    // delete the table
+    BasicTable.drop(path, conf);
+
+  }
+
+  @Test
+  public void test3() throws IOException, ParseException {
+    /*
+     * Tests concurrrent drop CGs while one fails
+     */
+
+    BasicTable.drop(path, conf);
+
+    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path, true, false);
+    System.out.println("Frist dump:");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+    int rowsToRead = Math.min(10, numRows);
+
+    // normal table.
+    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+        rowsToRead);
+
+    // create a thread for each dropCG
+    DropThread[] threads = new DropThread[7];
+
+    for (int i = 0; i < threads.length; i++) {
+
+      threads[i] = new DropThread(i);
+    }
+
+    // start the threads
+    for (int j = 0; j < threads.length; j++) {
+      threads[j].start();
+    }
+
+    for (Thread thr : threads) {
+      try {
+        thr.join();
+      } catch (InterruptedException e) {
+      }
+    }
+
+    // check various read cases.
+
+    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+        true, true, true, true }, rowsToRead);
+    System.out.println("second dump");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+    // Now make sure the reader reports zero rows.
+    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+    // delete the table
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void test5() throws IOException, ParseException {
+    /*
+     * Tests drop CGs while reading the same CGs
+     */
+
+    System.out.println("######int test 5");
+    BasicTable.drop(path, conf);
+
+    int numRows = TestBasicTable.createBasicTable(1, 100000,
+        "a, b, c, d, e, f", "[a, b]; [c, d]", path, true, false);
+
+    System.out.println("in test5 , dump infor 1");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+    int minRowsToRead = 10000;
+    int numOfReadThreads = 20;
+    int rowsToRead = Math.min(minRowsToRead, numRows);
+
+    // normal table.
+    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+        rowsToRead);
+
+    // create a thread for each dropCG
+    DropThread[] dropThreads = new DropThread[3];
+
+    for (int i = 0; i < dropThreads.length; i++) {
+
+      dropThreads[i] = new DropThread(i);
+    }
+
+    // start the threads
+    for (int j = 0; j < dropThreads.length; j++) {
+      dropThreads[j].start();
+    }
+
+    // create read threads
+    ReadThread[] readThreads = new ReadThread[numOfReadThreads];
+
+    for (int i = 0; i < readThreads.length; i++) {
+
+      readThreads[i] = new ReadThread(i, "a, b, c, d, e, f", 1000);
+    }
+
+    // start the threads
+    for (int j = 0; j < readThreads.length; j++) {
+      readThreads[j].start();
+    }
+
+    for (Thread thr : dropThreads) {
+      try {
+        thr.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    for (Thread thr : readThreads) {
+      try {
+        thr.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+        true, true, true, true }, rowsToRead);
+    System.out.println("second dump");
+    BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+    // Now make sure the reader reports zero rows.
+    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+    // delete the table
+    BasicTable.drop(path, conf);
+
+  }
+
+  @Test
+  public void test11() throws IOException, ParseException {
+
+    /*
+     * Tests test open non-existing table.
+     */
+
+    try {
+      new BasicTable.Reader(new Path(path.toString(), "non-existing"), conf);
+      Assert.fail("read none existing table should fail");
+    } catch (Exception e) {
+
+    }
+
+  }
+
+  @Test
+  public void test12() throws IOException, ParseException {
+    /*
+     * Tests API, path is wrong
+     */
+
+    BasicTable.drop(path, conf);
+
+    TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path, true, false);
+    Path wrongPath = new Path(path.toString() + "non-existing");
+    try {
+      BasicTable.dropColumnGroup(wrongPath, conf, "CG0");
+      Assert.fail("should throw excepiton");
+    } catch (Exception e) {
+
+    }
+    BasicTable.drop(path, conf);
+  }
+
+  @Test
+  public void test13() throws IOException, ParseException {
+    /*
+     * Tests API, conf is null
+     */
+
+    Path path1 = new Path(path.toString() + "13");
+    TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path1, true, false);
+    try {
+      BasicTable.dropColumnGroup(path1, null, "CG0");
+      Assert.fail("should throw excepiton");
+    } catch (Exception e) {
+
+    }
+    BasicTable.drop(path1, conf);
+  }
+
+  @Test
+  public void test14() throws IOException, ParseException {
+    /*
+     * Tests API, CG name is empty string
+     */
+
+    Path path1 = new Path(path.toString() + "14");
+    TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path1, true, false);
+    try {
+      BasicTable.dropColumnGroup(path1, conf, "");
+      Assert.fail("should throw excepiton");
+    } catch (Exception e) {
+
+    }
+    BasicTable.drop(path1, conf);
+  }
+
+  @Test
+  public void test15() throws IOException, ParseException {
+    /*
+     * Tests API, CG name is null
+     */
+
+    Path path1 = new Path(path.toString() + "15");
+
+    TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a];[b];[c];[d];[e];[f]", path1, true, false);
+    try {
+      BasicTable.dropColumnGroup(path1, conf, null);
+      Assert.fail("should throw excepiton");
+    } catch (Exception e) {
+
+    }
+    BasicTable.drop(path1, conf);
+  }
+
+  @Test
+  public void test16() throws IOException, ParseException {
+    /*
+     * Tests delete same CG multiple times
+     */
+
+    Path path1 = new Path(path.toString() + "16");
+
+    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+        "[a, b]; [c, d]", path1, true, false);
+
+    int rowsToRead = Math.min(10, numRows);
+
+    // normal table.
+    verifyScanner(path1, conf, "a, c, x", new boolean[] { false, false, true },
+        rowsToRead);
+
+    // Now delete ([c, d)
+    BasicTable.dropColumnGroup(path1, conf, "CG1");
+
+    // check various read cases.
+    verifyScanner(path1, conf, "c, a", new boolean[] { true, false },
+        rowsToRead);
+
+    // Now delete ([c, d)again
+    BasicTable.dropColumnGroup(path1, conf, "CG1");
+
+    verifyScanner(path1, conf, "c, a", new boolean[] { true, false },
+        rowsToRead);
+    BasicTable.drop(path1, conf);
+  }
+
+  @Test
+  public void test17() throws IOException, ParseException {
+    /*
+     * test rangesplit
+     */
+    System.out.println("test 17");
+
+    Path path1 = new Path(path.toString() + "17");
+    TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f", "[a,b,c,d,e,f]",
+        path1, true, false);
+
+    BasicTable.dropColumnGroup(path1, conf, "CG0");
+
+    BasicTable.Reader reader = new BasicTable.Reader(path1, conf);
+    reader.setProjection("a, b, c, d, e, f");
+    List<RangeSplit> splits = reader.rangeSplit(1);
+    TableScanner scanner = null;
+    try {
+      scanner = reader.getScanner(splits.get(0), true);
+    } catch (Exception e) {
+      System.out.println("in test 17, getScanner");
+      e.printStackTrace();
+    }
+
+    Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+    scanner.getValue(RowValue);
+
+    Assert.assertEquals(null, RowValue.get(0));
+    Assert.assertFalse(scanner.advance());
+    scanner.getValue(RowValue);
+    Assert.assertEquals(null, RowValue.get(0));
+    BasicTable.drop(path1, conf);
+  }
+
+  /**
+   * A thread that performs a DropColumnGroup.
+   */
+  class DropThread extends Thread {
+
+    private int id;
+
+    public DropThread(int id) {
+
+      this.id = id;
+
+    }
+
+    /**
+     * Executes DropColumnGroup.
+     */
+    public void run() {
+      try {
+        System.out.println("Droping CG: " + id);
+        BasicTable.dropColumnGroup(path, conf, "CG" + id);
+      } catch (Exception e) {
+        System.out.println(id + " - error: " + e);
+      }
+    }
+
+  }
+
+  /**
+   * A thread that performs a ReadColumnGroup.
+   */
+  class ReadThread extends Thread {
+
+    private int id;
+    private String projection;
+    private int numRowsToRead;
+
+    public ReadThread(int id, String projection, int numRowsToRead) {
+      this.id = id;
+      this.projection = projection;
+      this.numRowsToRead = numRowsToRead;
+
+    }
+
+    /**
+     * Executes DropColumnGroup.
+     */
+    public void run() {
+      BasicTable.Reader reader = null;
+      try {
+        reader = new BasicTable.Reader(path, conf);
+        reader.setProjection(projection);
+        TableScanner scanner = reader.getScanner(null, true);
+        Tuple row = TypesUtils.createTuple(reader.getSchema());
+        for (int i = 0; i < numRowsToRead; i++) {
+          scanner.getValue(row);
+        }
+        scanner.advance();
+        scanner.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+
+    }
+  }
+}


Reply via email to