Author: gates
Date: Fri Oct 16 22:15:59 2009
New Revision: 826110
URL: http://svn.apache.org/viewvc?rev=826110&view=rev
Log:
PIG-993 Ability to drop a column group in a table.
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestColumnSecurity.java
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/build.xml
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Oct 16 22:15:59 2009
@@ -5,6 +5,8 @@
INCOMPATIBLE CHANGES
IMPROVEMENTS
+
+ PIG-993 Ability to drop a column group in a table (yanz and rangadi via
gates)
PIG-992 Separate schema related files into a schema package (yanz via
gates)
Modified: hadoop/pig/trunk/contrib/zebra/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/build.xml?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/build.xml (original)
+++ hadoop/pig/trunk/contrib/zebra/build.xml Fri Oct 16 22:15:59 2009
@@ -73,7 +73,7 @@
<target name="javadoc" depends="jar">
<mkdir dir="${build.javadoc}" />
<javadoc packagenames="org.apache.pig.*"
overview="${src.docs.dir}/overview.html" destdir="${build.javadoc}"
author="true" version="true" use="true" windowtitle="Hadoop Zebra API"
doctitle="Hadoop Zebra API" bottom="Copyright &copy; ${year} The Apache
Software Foundation">
- <packageset dir="${src.dir}"
excludes="**/examples,**/comparator/,**/io/" />
+ <packageset dir="${src.dir}" excludes="**/examples" />
<link href="${javadoc.link}" />
<classpath refid="classpath"/>
<!--
@@ -126,10 +126,16 @@
<classpath refid="test.classpath"/>
<formatter type="${test.junit.output.format}" />
- <batchtest todir="${build.test}">
+ <batchtest todir="${build.test}" unless="testcase">
<fileset dir="${src.test}"
includes="**/TestCheckin*.java"
excludes="**/${test.exclude}.java" />
</batchtest>
+ <batchtest todir="${build.test}" if="testcase">
+ <fileset dir="${src.test}"
+ includes="**/${testcase}.java"/>
+ </batchtest>
+
+
</junit>
<fail if="tests.failed">Tests failed!</fail>
</target>
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
Fri Oct 16 22:15:59 2009
@@ -22,15 +22,21 @@
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -68,6 +74,9 @@
* </ul>
*/
public class BasicTable {
+
+ static Log LOG = LogFactory.getLog(BasicTable.class);
+
// name of the BasicTable schema file
private final static String BT_SCHEMA_FILE = ".btschema";
// schema version
@@ -80,12 +89,114 @@
// default comparator to "memcmp"
private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
+ private final static String DELETED_CG_PREFIX = ".deleted-";
+
// no public ctor for instantiating a BasicTable object
private BasicTable() {
// no-op
}
/**
+ * Deletes the data for column group specified by cgName.
+ * When the readers try to read the fields that were stored in the
+ * column group get null since the underlying data is removed.
+ * <br> <br>
+ *
+ * Effect on the readers that are currently reading from the table while
+ * a column group is droped is unspecified. Suggested practice is to
+ * drop column groups when there are no readers or writes for the table.
+ * <br> <br>
+ *
+ * Column group names are usually specified in the "storage hint" while
+ * creating a table. If no name is specified, system assigns a simple name.
+ * These names could be obtained through "dumpInfo()" and other methods.
+ * <br> <br>
+ *
+ * Dropping a column group that has already been removed is a no-op no
+ * exception is thrown.
+ *
+ * @param path path to BasicTable
+ * @param conf Configuration determines file system and other parameters.
+ * @param cgName name of the column group to drop.
+ * @throws IOException IOException could occur for various reasons. E.g.
+ * a user does not have permissions to write to table directory.
+ *
+ */
+ public static void dropColumnGroup(Path path, Configuration conf,
+ String cgName)
+ throws IOException {
+
+ FileSystem fs = FileSystem.get(conf);
+
+ SchemaFile schemaFile = new SchemaFile(path, conf);
+
+ int cgIdx = schemaFile.getCGByName(cgName);
+ if (cgIdx < 0) {
+ throw new IOException(path +
+ " : Could not find a column group with the name '" + cgName +
"'");
+ }
+
+ Path cgPath = new Path(path, schemaFile.getName(cgIdx));
+
+ //Clean up any previous unfinished attempts to drop column groups?
+
+ if (schemaFile.isCGDeleted(cgIdx)) {
+ // Clean up unfinished delete if it exists. so that clean up can
+ // complete if the previous deletion was interrupted for some reason.
+ if (fs.exists(cgPath)) {
+ LOG.info(path + " : " +
+ " clearing unfinished deletion of column group " +
+ cgName + ".");
+ fs.delete(cgPath, true);
+ }
+ LOG.info(path + " : column group " + cgName + " is already deleted.");
+ return;
+ }
+
+ // try to delete the column group:
+
+ // first check if the user has enough permissions to list the directory
+ fs.listStatus(cgPath);
+
+ //verify if the user has enough permissions by trying to create
+ //a temporary file in cg.
+ OutputStream out = fs.create(
+ new Path(cgPath, ".tmp" + DELETED_CG_PREFIX + cgName), true);
+ out.close();
+
+ //First try to create a file indicating a column group is deleted.
+ try {
+ Path deletedCGPath = new Path(path, DELETED_CG_PREFIX + cgName);
+ // create without overriding.
+ out = fs.create(deletedCGPath, false);
+ // should we write anything?
+ out.close();
+ } catch (IOException e) {
+ // one remote possibility is that another user
+ // already deleted CG.
+ SchemaFile tempSchema = new SchemaFile(path, conf);
+ if (tempSchema.isCGDeleted(cgIdx)) {
+ LOG.info(path + " : " + cgName +
+ " is deleted by someone else. That is ok.");
+ return;
+ }
+ // otherwise, it is some other error.
+ throw e;
+ }
+
+ // At this stage, the CG is marked deleted. Now just try to
+ // delete the actual directory:
+ if (!fs.delete(cgPath, true)) {
+ String msg = path + " : Could not detete column group " +
+ cgName + ". It is marked deleted.";
+ LOG.warn(msg);
+ throw new IOException(msg);
+ }
+
+ LOG.info("Dropped " + cgName + " from " + path);
+ }
+
+ /**
* BasicTable reader.
*/
public static class Reader implements Closeable {
@@ -96,6 +207,7 @@
boolean inferredMapping;
private MetaFile.Reader metaReader;
private BasicTableStatus status;
+ private int firstValidCG = -1; /// First column group that exists.
Partition partition;
ColumnGroup.Reader[] colGroups;
Tuple[] cgTuples;
@@ -103,9 +215,20 @@
private synchronized void checkInferredMapping() throws ParseException,
IOException {
if (!inferredMapping) {
for (int i = 0; i < colGroups.length; ++i) {
- colGroups[i].setProjection(partition.getProjection(i));
- if (partition.isCGNeeded(i))
- cgTuples[i] = TypesUtils.createTuple(colGroups[i].getSchema());
+ if (colGroups[i] != null) {
+ colGroups[i].setProjection(partition.getProjection(i));
+ }
+ if (partition.isCGNeeded(i)) {
+ if (isCGDeleted(i)) {
+ // this is a deleted column group. Warn about it.
+ LOG.warn("Trying to read from deleted column group " +
+ schemaFile.getName(i) +
+ ". NULL is returned for corresponding columns. " +
+ "Table at " + path);
+ } else {
+ cgTuples[i] = TypesUtils.createTuple(colGroups[i].getSchema());
+ }
+ }
else
cgTuples[i] = null;
}
@@ -119,6 +242,13 @@
}
/**
+ * Returns true if a column group is deleted.
+ */
+ private boolean isCGDeleted(int nx) {
+ return colGroups[nx] == null;
+ }
+
+ /**
* Create a BasicTable reader.
*
* @param path
@@ -143,15 +273,19 @@
String storage = schemaFile.getStorageString();
partition = new Partition(schema, projection, storage);
for (int nx = 0; nx < numCGs; nx++) {
- colGroups[nx] =
- new ColumnGroup.Reader(new Path(path,
partition.getCGSchema(nx).getName()),
- conf);
- if (partition.isCGNeeded(nx))
+ if (!schemaFile.isCGDeleted(nx)) {
+ colGroups[nx] =
+ new ColumnGroup.Reader(new Path(path,
partition.getCGSchema(nx).getName()),
+ conf);
+ if (firstValidCG < 0) {
+ firstValidCG = nx;
+ }
+ }
+ if (colGroups[nx] != null && partition.isCGNeeded(nx))
cgTuples[nx] = TypesUtils.createTuple(colGroups[nx].getSchema());
else
cgTuples[nx] = null;
}
- partition.setSource(cgTuples);
buildStatus();
closed = false;
}
@@ -254,8 +388,10 @@
throws IOException {
BlockDistribution bd = new BlockDistribution();
for (int nx = 0; nx < colGroups.length; nx++) {
- bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split
+ if (!isCGDeleted(nx)) {
+ bd.add(colGroups[nx].getBlockDistribution(split == null ? null :
split
.get(nx)));
+ }
}
return bd;
}
@@ -275,7 +411,9 @@
KeyDistribution kd =
new
KeyDistribution(TFile.makeComparator(schemaFile.getComparator()));
for (int nx = 0; nx < colGroups.length; nx++) {
- kd.add(colGroups[nx].getKeyDistribution(n));
+ if (!isCGDeleted(nx)) {
+ kd.add(colGroups[nx].getKeyDistribution(n));
+ }
}
if (kd.size() > (int) (n * 1.5)) {
kd.resize(n);
@@ -384,12 +522,16 @@
List<CGRangeSplit>[] cgSplitsAll = new ArrayList[colGroups.length];
// split each CG
for (int nx = 0; nx < colGroups.length; nx++) {
- cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
+ if (!isCGDeleted(nx))
+ cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
}
// verify all CGs have same number of slices
int numSlices = -1;
for (int nx = 0; nx < cgSplitsAll.length; nx++) {
+ if (isCGDeleted(nx)) {
+ continue;
+ }
if (numSlices < 0) {
numSlices = cgSplitsAll[nx].size();
}
@@ -398,12 +540,22 @@
"BasicTable's column groups were not equally split.");
}
}
+ if (numSlices <= 0) {
+ // This could happen because of various reasons.
+ // One possibility is that all the CGs are deleted.
+ numSlices = 1;
+ }
// return horizontal slices as RangeSplits
List<RangeSplit> ret = new ArrayList<RangeSplit>(numSlices);
for (int slice = 0; slice < numSlices; slice++) {
CGRangeSplit[] oneSliceSplits = new CGRangeSplit[cgSplitsAll.length];
for (int cgIndex = 0; cgIndex < cgSplitsAll.length; cgIndex++) {
- oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
+ if (isCGDeleted(cgIndex)) {
+ // set a dummy split
+ oneSliceSplits[cgIndex] = new CGRangeSplit(0, 0);
+ } else {
+ oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
+ }
}
ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplits));
}
@@ -420,7 +572,9 @@
closed = true;
metaReader.close();
for (int i = 0; i < colGroups.length; ++i) {
- colGroups[i].close();
+ if (colGroups[i] != null) {
+ colGroups[i].close();
+ }
}
}
finally {
@@ -452,12 +606,20 @@
private void buildStatus() {
status = new BasicTableStatus();
- status.beginKey = colGroups[0].getStatus().getBeginKey();
- status.endKey = colGroups[0].getStatus().getEndKey();
- status.rows = colGroups[0].getStatus().getRows();
+ if (firstValidCG >= 0) {
+ status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
+ status.endKey = colGroups[firstValidCG].getStatus().getEndKey();
+ status.rows = colGroups[firstValidCG].getStatus().getRows();
+ } else {
+ status.beginKey = new BytesWritable(new byte[0]);
+ status.endKey = status.beginKey;
+ status.rows = 0;
+ }
status.size = 0;
for (int nx = 0; nx < colGroups.length; nx++) {
- status.size += colGroups[nx].getStatus().getSize();
+ if (colGroups[nx] != null) {
+ status.size += colGroups[nx].getStatus().getSize();
+ }
}
}
@@ -549,14 +711,18 @@
schema = partition.getProjection();
cgScanners = new TableScanner[colGroups.length];
for (int i = 0; i < colGroups.length; ++i) {
- // if no CG is needed explicitly by projection but the "countRow"
still needs to access some column group
- if (partition.isCGNeeded(i) || (!anyScanner && (i ==
colGroups.length-1)))
+ if (!isCGDeleted(i) && partition.isCGNeeded(i))
{
anyScanner = true;
cgScanners[i] = colGroups[i].getScanner(beginKey, endKey, false);
} else
cgScanners[i] = null;
}
+ if (!anyScanner && firstValidCG >= 0) {
+ // if no CG is needed explicitly by projection but the "countRow"
still needs to access some column group
+ cgScanners[firstValidCG] = colGroups[firstValidCG].
+ getScanner(beginKey, endKey, false);
+ }
this.closeReader = closeReader;
sClosed = false;
}
@@ -591,7 +757,7 @@
boolean anyScanner = false;
for (int i = 0; i < colGroups.length; ++i) {
// if no CG is needed explicitly by projection but the "countRow"
still needs to access some column group
- if (partition.isCGNeeded(i) || (!anyScanner && (i ==
colGroups.length-1)))
+ if (!isCGDeleted(i) && partition.isCGNeeded(i))
{
cgScanners[i] =
colGroups[i].getScanner(split == null ? null : split.get(i),
@@ -600,6 +766,11 @@
} else
cgScanners[i] = null;
}
+ if (!anyScanner && firstValidCG >= 0) {
+ // if no CG is needed explicitly by projection but the "countRow"
still needs to access some column group
+ cgScanners[firstValidCG] = colGroups[firstValidCG].
+ getScanner(split == null ? null : split.get(firstValidCG),
false);
+ }
this.partition = partition;
this.closeReader = closeReader;
sClosed = false;
@@ -1232,7 +1403,12 @@
boolean sorted;
String storage;
CGSchema[] cgschemas;
-
+
+ // Array indicating if a physical schema is already dropped
+ // It is probably better to create "CGProperties" class and
+ // store multiple properties like name there.
+ boolean[] cgDeletedFlags;
+
// ctor for reading
public SchemaFile(Path path, Configuration conf) throws IOException {
readSchemaFile(path, conf);
@@ -1260,6 +1436,7 @@
for (int nx = 0; nx < cgschemas.length; nx++) {
physical[nx] = cgschemas[nx].getSchema();
}
+ cgDeletedFlags = new boolean[physical.length];
this.sorted = sorted;
version = SCHEMA_VERSION;
@@ -1303,6 +1480,24 @@
return cgschemas[nx].getCompressor();
}
+ /**
+ * Returns the index for CG with the given name.
+ * -1 indicates that there is no CG with the name.
+ */
+ int getCGByName(String cgName) {
+ for(int i=0; i<physical.length; i++) {
+ if (cgName.equals(getName(i))) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /** Returns if the CG at the given index is delete */
+ boolean isCGDeleted(int idx) {
+ return cgDeletedFlags[idx];
+ }
+
public String getOwner(int nx) {
return cgschemas[nx].getOwner();
}
@@ -1315,7 +1510,6 @@
return cgschemas[nx].getPerm();
}
-
/**
* @return the string representation of the physical schema.
*/
@@ -1377,8 +1571,10 @@
catch (Exception e) {
throw new IOException("Partition constructor failed :" +
e.getMessage());
}
+ cgschemas = partition.getCGSchemas();
int numCGs = WritableUtils.readVInt(in);
physical = new Schema[numCGs];
+ cgDeletedFlags = new boolean[physical.length];
TableSchemaParser parser;
String cgschemastr;
try {
@@ -1392,12 +1588,37 @@
throw new IOException("parser.RecordSchema failed :" + e.getMessage());
}
sorted = WritableUtils.readVInt(in) == 1 ? true : false;
+ setCGDeletedFlags(path, conf);
in.close();
}
private static Path makeSchemaFilePath(Path parent) {
return new Path(parent, BT_SCHEMA_FILE);
}
+
+ /**
+ * Sets cgDeletedFlags array by checking presense of
+ * ".deleted-CGNAME" directory in the table top level
+ * directory.
+ */
+ void setCGDeletedFlags(Path path, Configuration conf) throws IOException {
+
+ Set<String> deletedCGs = new HashSet<String>();
+
+ for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
+ if (!file.isDir()) {
+ String fname = file.getPath().getName();
+ if (fname.startsWith(DELETED_CG_PREFIX)) {
+ deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+ }
+ }
+ }
+
+ for(int i=0; i<physical.length; i++) {
+ cgDeletedFlags[i] =
+ deletedCGs.contains(getName(i));
+ }
+ }
}
static public void dumpInfo(String file, PrintStream out, Configuration conf)
@@ -1421,7 +1642,15 @@
for (int nx = 0; nx < reader.colGroups.length; nx++) {
IOutils.indent(out, indent);
out.printf("\nColumn Group [%d] :", nx);
- ColumnGroup.dumpInfo(reader.colGroups[nx].path, out, conf, indent);
+ if (reader.colGroups[nx] != null) {
+ ColumnGroup.dumpInfo(reader.colGroups[nx].path, out, conf, indent);
+ } else {
+ // print basic info for deleted column groups.
+ out.printf("\nColum Group : DELETED");
+ out.printf("\nName : %s", reader.schemaFile.getName(nx));
+ out.printf("\nSchema : %s\n",
+ reader.schemaFile.cgschemas[nx].getSchema().toString());
+ }
}
}
catch (Exception e) {
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
Fri Oct 16 22:15:59 2009
@@ -426,6 +426,12 @@
src.setProjIndex(mSize++);
}
+ void cleanup()
+ {
+ for (int i = 0; i < mSources.size(); i++)
+ mSources.get(i).cleanup();
+ }
+
void insert(final BytesWritable key) throws ExecException {
for (int i = 0; i < mSize; i++)
((Tuple) mTuple).set(mSources.get(i).getProjIndex(), mSources.get(i)
@@ -488,6 +494,7 @@
private Object mTuple = null;
private boolean mNeedTmpTuple;
private HashSet<String> mKeys; // interested hash keys
+ private PartitionedColumn parent = null;
PartitionedColumn(int fi, boolean needTmpTuple)
throws IOException {
@@ -505,6 +512,10 @@
mKeys = keys;
}
+ private void setParent(PartitionedColumn parent) {
+ this.parent = parent;
+ }
+
/**
* stitch op
*/
@@ -580,6 +591,28 @@
if (mChildren == null) mChildren = new ArrayList<PartitionedColumn>();
mChildren.add(child);
mChildrenLen++;
+ child.setParent(this);
+ }
+
+ void cleanup() {
+ if (parent != null) {
+ parent.removeChild(this);
+ }
+ if (mNeedTmpTuple && mTuple != null)
+ mTuple = null;
+ }
+
+ void removeChild(PartitionedColumn child)
+ {
+ for (int i = 0; i < mChildrenLen; i++)
+ {
+ if (mChildren.get(i) == child)
+ {
+ mChildren.remove(i);
+ mChildrenLen--;
+ i--;
+ }
+ }
}
void setProjIndex(int projindex) {
@@ -1025,8 +1058,7 @@
cgentry = getCGEntry(getCGIndex(child).getCGIndex());
PartitionedColumn mapParCol =
- new PartitionedColumn(i, Partition.SplitType.MAP, true);
- mPCNeedTmpTuple.add(mapParCol);
+ new PartitionedColumn(i, Partition.SplitType.MAP, false);
cgentry.addUser(mapParCol, getCGName(child));
mExecs.add(mapParCol); // not a leaf : MAP stitch needed
mStitchSize++;
@@ -1047,12 +1079,11 @@
if (!projectedCGs.contains(index))
{
PartitionedColumn parCol =
- new PartitionedColumn(0, true);
- mPCNeedTmpTuple.add(parCol);
+ new PartitionedColumn(0, false);
+ // mPCNeedTmpTuple.add(parCol);
cgentry.addUser(parCol, getCGName(child), cgindex.getKeys());
mapParCol.addChild(parCol); // contribute to the non-key-partitioned
// hashes
- mPCNeedMap.add(parCol);
projectedCGs.add(index);
}
}
@@ -1079,19 +1110,17 @@
cgentry = getCGEntry(mapentry.getKey().getCGIndex());
if (needParent)
{
- parCol = new PartitionedColumn(i, Partition.SplitType.MAP, true);
+ parCol = new PartitionedColumn(i, Partition.SplitType.MAP, false);
mExecs.add(parCol); // not a leaf : MAP stitch needed
mStitchSize++;
- mPCNeedMap.add(parCol);
parent.addChild(parCol);
parent = parCol;
needParent = false;
newParent = true;
} else {
- parCol = new PartitionedColumn(newParent ? 0 : i, true);
+ parCol = new PartitionedColumn(newParent ? 0 : i, false);
parent.addChild(parCol);
}
- mPCNeedTmpTuple.add(parCol);
cgentry.addUser(parCol, getCGName(child), projectedKeys);
}
}
@@ -1250,7 +1279,14 @@
throw new ParseException(
"Internal Logical Error: Invalid number of column groups");
for (int i = 0; i < tuples.length; i++) {
- if (mCGs.get(i) != null) mCGs.get(i).setSource(tuples[i]);
+ if (mCGs.get(i) != null) {
+ if (tuples[i] == null) {
+ mCGs.get(i).cleanup();
+ mCGs.remove(i);
+ } else {
+ mCGs.get(i).setSource(tuples[i]);
+ }
+ }
}
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
Fri Oct 16 22:15:59 2009
@@ -69,19 +69,19 @@
public static void tearDownOnce() throws IOException {
}
- BytesWritable makeRandomKey(int max) {
+ static BytesWritable makeRandomKey(int max) {
return makeKey(random.nextInt(max));
}
- BytesWritable makeKey(int i) {
+ static BytesWritable makeKey(int i) {
return new BytesWritable(String.format("key%09d", i).getBytes());
}
- String makeString(String prefix, int max) {
+ static String makeString(String prefix, int max) {
return String.format("%s%09d", prefix, random.nextInt(max));
}
- int createBasicTable(int parts, int rows, String strSchema, String storage,
+ static int createBasicTable(int parts, int rows, String strSchema, String
storage,
Path path, boolean properClose, boolean sorted) throws IOException {
if (fs.exists(path)) {
BasicTable.drop(path, conf);
@@ -133,7 +133,7 @@
return total;
}
- void rangeSplitBasicTable(int numSplits, int totalRows, String strProjection,
+ static void rangeSplitBasicTable(int numSplits, int totalRows, String
strProjection,
Path path) throws IOException, ParseException {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
reader.setProjection(strProjection);
@@ -153,7 +153,7 @@
// TODO: verify tuples contains the right projected values
}
- void doRangeSplit(int[] numSplits, int totalRows, String projection, Path
path)
+ static void doRangeSplit(int[] numSplits, int totalRows, String projection,
Path path)
throws IOException, ParseException {
for (int i : numSplits) {
if (i > 0) {
@@ -162,7 +162,7 @@
}
}
- void keySplitBasicTable(int numSplits, int totalRows, String strProjection,
+ static void keySplitBasicTable(int numSplits, int totalRows, String
strProjection,
Path path) throws IOException, ParseException {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
reader.setProjection(strProjection);
@@ -211,7 +211,7 @@
Assert.assertEquals(total, totalRows);
}
- void doKeySplit(int[] numSplits, int totalRows, String projection, Path path)
+ static void doKeySplit(int[] numSplits, int totalRows, String projection,
Path path)
throws IOException, ParseException {
for (int i : numSplits) {
if (i > 0) {
@@ -220,7 +220,7 @@
}
}
- BasicTableStatus getStatus(Path path) throws IOException {
+ static BasicTableStatus getStatus(Path path) throws IOException {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
try {
return reader.getStatus();
@@ -229,7 +229,7 @@
}
}
- void doReadWrite(Path path, int parts, int rows, String schema,
+ static void doReadWrite(Path path, int parts, int rows, String schema,
String storage, String projection, boolean properClose, boolean sorted)
throws IOException, ParseException {
int totalRows = createBasicTable(parts, rows, schema, storage, path,
@@ -261,7 +261,7 @@
doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
}
- int doReadOnly(TableScanner scanner) throws IOException, ParseException {
+ static int doReadOnly(TableScanner scanner) throws IOException,
ParseException {
int total = 0;
BytesWritable key = new BytesWritable();
Tuple value = TypesUtils.createTuple(scanner.getSchema());
Modified:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java?rev=826110&r1=826109&r2=826110&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCheckin.java
Fri Oct 16 22:15:59 2009
@@ -36,6 +36,7 @@
TestColumnGroupReaders.class,
TestColumnGroupSchemas.class,
TestColumnGroupSplits.class,
+ TestDropColumnGroup.class,
TestMap.class,
TestMapOfRecord.class,
TestMixedType1.class,
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=826110&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
Fri Oct 16 22:15:59 2009
@@ -0,0 +1,797 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import java.util.List;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.parser.ParseException;
+
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+import org.junit.AfterClass;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDropColumnGroup {
+ Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
+ private static Path path;
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUpOnce() throws IOException {
+ TestBasicTable.setUpOnce();
+ path = new Path(TestBasicTable.rootPath, "DropCGTest");
+ conf = TestBasicTable.conf;
+ Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
+
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ BasicTable.drop(path, conf);
+ }
+
+ /**
+ * Utitility function to open a table with a given projection and verify that
+ * certain fields in the returned tuple are null and certain fields are not.
+ */
+ void verifyScanner(Path path, Configuration conf, String projection,
+ boolean isNullExpected[], int numRowsToRead) throws IOException,
+ ParseException {
+
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(null, true);
+
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+
+ for (int i = 0; i < numRowsToRead; i++) {
+ scanner.getValue(row);
+ for (int f = 0; f < isNullExpected.length; f++) {
+ if (isNullExpected[f] ^ row.get(f) == null) {
+ throw new IOException("Verification failure at field " + f + " row "
+ + i + " : expected " + (isNullExpected[f] ? "NULL" : "nonNULL")
+ + " but got opposite.");
+
+ }
+ }
+ scanner.advance();
+ }
+
+ scanner.close();
+ }
+
+ int countRows(Path path, Configuration conf, String projection)
+ throws IOException, ParseException {
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(null, true);
+ int count = 0;
+ while (!scanner.atEnd()) {
+ count++;
+ scanner.advance();
+ }
+ scanner.close();
+ return count;
+ }
+
+ @Test
+ public void testDropColumnGroup() throws IOException, ParseException {
+ /*
+ * Tests basic drop columns feature. Also tests that fields in dropped
+ * columns can be read the value returned is null.
+ */
+
+ BasicTable.drop(path, conf);
+
+ int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a, b]; [c, d]", path, true, false);
+
+ int rowsToRead = Math.min(10, numRows);
+
+ // normal table.
+ verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ rowsToRead);
+
+ // Now delete ([c, d)
+ BasicTable.dropColumnGroup(path, conf, "CG1");
+
+ // check various read cases.
+ verifyScanner(path, conf, "c, a", new boolean[] { true, false },
rowsToRead);
+ verifyScanner(path, conf, "c, a", new boolean[] { true, false },
rowsToRead);
+
+ verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, false,
+ false, false, true, false }, rowsToRead);
+
+ BasicTable.dumpInfo(path.toString(), System.err, conf);
+
+ // Drop CG0 ([a, b])
+ BasicTable.dropColumnGroup(path, conf, "CG0");
+
+ verifyScanner(path, conf, "a, b", new boolean[] { true, true },
rowsToRead);
+
+ // Drop remaining CG2
+ BasicTable.dropColumnGroup(path, conf, "CG2");
+
+ verifyScanner(path, conf, "a, b, c, d, e, f", new boolean[] { true, true,
+ true, true, true, true }, rowsToRead);
+
+ // Now make sure the reader reports zero rows.
+ Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+ // delete the table
+ BasicTable.drop(path, conf);
+
+ /*
+ * Try similar tests with range splits.
+ */
+
+ // 5 splits and 50 rows
+ numRows = TestBasicTable.createBasicTable(5, 50, "a, b, c, d, e, f",
+ "[a, b]; [c, d]; [e] as myCG", path, true, false);
+
+ BasicTable.dropColumnGroup(path, conf, "myCG");
+
+ verifyScanner(path, conf, "e, c, g, b", new boolean[] { true, false, true,
+ false }, numRows);
+
+ TestBasicTable.doRangeSplit(new int[] { 4, 0, 2 }, numRows,
+ "a, b, e, f, x", path);
+
+ // Remove another CG.
+ BasicTable.dropColumnGroup(path, conf, "CG0");
+
+ TestBasicTable.doRangeSplit(new int[] { 4, 0, 2, 3, 1 }, numRows,
+ "a, y, e, f, x", path);
+
+ BasicTable.drop(path, conf);
+ }
+
+ @Test
+ public void testDropColumnGroupsMixedTypes() throws IOException,
ParseException {
+
+ String mixedSchema = /* roughly borrowed from testMixedType1.java */
+ "s1:bool, s2:int, s3:long, s4:float, s5:string, s6:bytes, "
+ + "r1:record(f1:int, f2:long), r2:record(r3:record(f3:float, f4)),"
+ + "m1:map(string),m2:map(map(int)), "
+ + "c:collection(f13:double, f14:float, f15:bytes)";
+ // [s1, s2]; [m1#{a}]; [r1.f1]; [s3, s4, r2.r3.f3]; [s5, s6, m2#{x|y}];
+ // [r1.f2, m1#{b}]; [r2.r3.f4, m2#{z}]";
+ String mixedStorageHint = "[s1, s2] as simpleCG; "
+ + "[m1#{a}, s3] as mapCG; "
+ + "[s4, r2.r3.f3, r1.f1] as recordCG; "
+ + "[c] as collectionCG; "
+ + "[r1.f2, m1#{b}, m2#{z}] as mapRecordCG; ";
+
+ Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
+ Configuration conf = TestBasicTable.conf;
+ conf.set("fs.default.name", "file:///");
+
+ BasicTable.drop(path, conf);
+
+ // first write the table :
+ BasicTable.Writer writer = new BasicTable.Writer(path, mixedSchema,
+ mixedStorageHint, false, conf);
+ writer.finish();
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple(schema);
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part, true);
+ TypesUtils.resetTuple(tuple);
+
+ Tuple tupRecord1 = TypesUtils.createTuple(schema.getColumnSchema("r1")
+ .getSchema());
+ Tuple tupRecord2 = TypesUtils.createTuple(schema.getColumnSchema("r2")
+ .getSchema());
+
+ Tuple tupRecord3 = TypesUtils.createTuple(new Schema("f3:float, f4"));
+
+ // row 1
+ tuple.set(0, true); // bool
+ tuple.set(1, 1); // int
+ tuple.set(2, 1001L); // long
+ tuple.set(3, 1.1); // float
+ tuple.set(4, "hello world 1"); // string
+ tuple.set(5, new DataByteArray("hello byte 1")); // byte
+
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 1);
+ tupRecord1.set(1, 1001L);
+ tuple.set(6, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 1.3);
+ tupRecord3.set(1, new DataByteArray("r3 row 1 byte array "));
+ tuple.set(7, tupRecord2);
+
+ // m1:map(string)
+ Map<String, String> m1 = new HashMap<String, String>();
+ m1.put("a", "A");
+ m1.put("b", "B");
+ m1.put("c", "C");
+ tuple.set(8, m1);
+
+ // m2:map(map(int))
+ HashMap<String, Map<String, Integer>> m2 = new HashMap<String, Map<String,
Integer>>();
+ Map<String, Integer> m3 = new HashMap<String, Integer>();
+ m3.put("m311", 311);
+ m3.put("m321", 321);
+ m3.put("m331", 331);
+ Map<String, Integer> m4 = new HashMap<String, Integer>();
+ m4.put("m411", 411);
+ m4.put("m421", 421);
+ m4.put("m431", 431);
+ m2.put("x", m3);
+ m2.put("y", m4);
+ tuple.set(9, m2);
+
+ // c:collection(f13:double, f14:float, f15:bytes)
+ DataBag bagColl = TypesUtils.createBag();
+ Schema schColl = schema.getColumn(10).getSchema();
+ Tuple tupColl1 = TypesUtils.createTuple(schColl);
+ Tuple tupColl2 = TypesUtils.createTuple(schColl);
+ byte[] abs1 = new byte[3];
+ byte[] abs2 = new byte[4];
+ tupColl1.set(0, 3.1415926);
+ tupColl1.set(1, 1.6);
+ abs1[0] = 11;
+ abs1[1] = 12;
+ abs1[2] = 13;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bagColl.add(tupColl1);
+ tupColl2.set(0, 123.456789);
+ tupColl2.set(1, 100);
+ abs2[0] = 21;
+ abs2[1] = 22;
+ abs2[2] = 23;
+ abs2[3] = 24;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bagColl.add(tupColl2);
+ tuple.set(10, bagColl);
+
+ int row = 0;
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1)
+ .getBytes()), tuple);
+ // row 2
+ row++;
+ TypesUtils.resetTuple(tuple);
+ TypesUtils.resetTuple(tupRecord1);
+ TypesUtils.resetTuple(tupRecord2);
+ TypesUtils.resetTuple(tupRecord3);
+ m1.clear();
+ m2.clear();
+ m3.clear();
+ m4.clear();
+ tuple.set(0, false);
+ tuple.set(1, 2); // int
+ tuple.set(2, 1002L); // long
+ tuple.set(3, 3.1); // float
+ tuple.set(4, "hello world 2"); // string
+ tuple.set(5, new DataByteArray("hello byte 2")); // byte
+
+ // r1:record(f1:int, f2:long
+ tupRecord1.set(0, 2);
+
+ tupRecord1.set(1, 1002L);
+ tuple.set(6, tupRecord1);
+
+ // r2:record(r3:record(f3:float, f4))
+ tupRecord2.set(0, tupRecord3);
+ tupRecord3.set(0, 2.3);
+ tupRecord3.set(1, new DataByteArray("r3 row2 byte array"));
+ tuple.set(7, tupRecord2);
+
+ // m1:map(string)
+ m1.put("a2", "A2");
+ m1.put("b2", "B2");
+ m1.put("c2", "C2");
+ tuple.set(8, m1);
+
+ // m2:map(map(int))
+ m3.put("m321", 321);
+ m3.put("m322", 322);
+ m3.put("m323", 323);
+ m2.put("z", m3);
+ tuple.set(9, m2);
+
+ // c:collection(f13:double, f14:float, f15:bytes)
+ bagColl.clear();
+ TypesUtils.resetTuple(tupColl1);
+ TypesUtils.resetTuple(tupColl2);
+ tupColl1.set(0, 7654.321);
+ tupColl1.set(1, 0.0001);
+ abs1[0] = 31;
+ abs1[1] = 32;
+ abs1[2] = 33;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bagColl.add(tupColl1);
+ tupColl2.set(0, 0.123456789);
+ tupColl2.set(1, 0.3333);
+ abs2[0] = 41;
+ abs2[1] = 42;
+ abs2[2] = 43;
+ abs2[3] = 44;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bagColl.add(tupColl2);
+ tuple.set(10, bagColl);
+
+ // Write same row 10 times:
+ for (int i = 0; i < 10; i++) {
+ inserter.insert(new BytesWritable(String.format("k%d%d", part + 1 + i,
+ row + 1 + i).getBytes()), tuple);
+ }
+
+ inserter.close();
+ writer1.finish();
+
+ writer.close();
+
+ int numRows = 11;
+ // drop mapCG: removes [m1#{a}, s3]
+ BasicTable.dropColumnGroup(path, conf, "mapCG");
+
+ verifyScanner(path, conf, "m1", new boolean[] { false }, numRows);
+
+ verifyScanner(path, conf, "s1, m1#{a}, m1#{b}, s3, s4", new boolean[] {
+ false, true, false, true, false }, numRows);
+
+ // drop simpleCG : removes [s1, s2]
+ BasicTable.dropColumnGroup(path, conf, "simpleCG");
+ verifyScanner(path, conf, "s1, m1#{a}, s2, m1#{b}", new boolean[] { true,
+ true, true, false }, numRows);
+
+ // drop mapRecordCG : removes [r1.f2, m1#{b}, m2#{z}];\
+ BasicTable.dropColumnGroup(path, conf, "mapRecordCG");
+ verifyScanner(path, conf, "r1.f1, r1.f2, m1#{a}, s5, m1#{b}",
+ new boolean[] { false, true, true, false, true }, numRows);
+
+ // drop collectionCG : removes c;\
+ BasicTable.dropColumnGroup(path, conf, "collectionCG");
+ verifyScanner(path, conf, "c.f1, c.f2, c.f3", new boolean[] { true, true,
+ true }, numRows);
+
+ // clean up the table
+ BasicTable.drop(path, conf);
+ }
+
+ @Test
+ public void test2() throws IOException, ParseException {
+ /*
+ * Tests concurrent drop CGs
+ */
+
+ BasicTable.drop(path, conf);
+
+ int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path, true, false);
+ System.out.println("Frist dump:");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+ int rowsToRead = Math.min(10, numRows);
+
+ // normal table.
+ verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ rowsToRead);
+
+ // create a thread for each dropCG
+ DropThread[] threads = new DropThread[6];
+
+ for (int i = 0; i < threads.length; i++) {
+
+ threads[i] = new DropThread(i);
+ }
+
+ // start the threads
+ for (int j = 0; j < threads.length; j++) {
+ threads[j].start();
+ }
+
+ for (Thread thr : threads) {
+ try {
+ thr.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ // check various read cases.
+
+ verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+ true, true, true, true }, rowsToRead);
+ System.out.println("second dump");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+ // Now make sure the reader reports zero rows.
+ Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+ // delete the table
+ BasicTable.drop(path, conf);
+
+ }
+
+ @Test
+ public void test3() throws IOException, ParseException {
+ /*
+ * Tests concurrrent drop CGs while one fails
+ */
+
+ BasicTable.drop(path, conf);
+
+ int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path, true, false);
+ System.out.println("Frist dump:");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+ int rowsToRead = Math.min(10, numRows);
+
+ // normal table.
+ verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ rowsToRead);
+
+ // create a thread for each dropCG
+ DropThread[] threads = new DropThread[7];
+
+ for (int i = 0; i < threads.length; i++) {
+
+ threads[i] = new DropThread(i);
+ }
+
+ // start the threads
+ for (int j = 0; j < threads.length; j++) {
+ threads[j].start();
+ }
+
+ for (Thread thr : threads) {
+ try {
+ thr.join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // check various read cases.
+
+ verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+ true, true, true, true }, rowsToRead);
+ System.out.println("second dump");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+ // Now make sure the reader reports zero rows.
+ Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+ // delete the table
+ BasicTable.drop(path, conf);
+ }
+
+ @Test
+ public void test5() throws IOException, ParseException {
+ /*
+ * Tests drop CGs while reading the same CGs
+ */
+
+ System.out.println("######int test 5");
+ BasicTable.drop(path, conf);
+
+ int numRows = TestBasicTable.createBasicTable(1, 100000,
+ "a, b, c, d, e, f", "[a, b]; [c, d]", path, true, false);
+
+ System.out.println("in test5 , dump infor 1");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+ int minRowsToRead = 10000;
+ int numOfReadThreads = 20;
+ int rowsToRead = Math.min(minRowsToRead, numRows);
+
+ // normal table.
+ verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ rowsToRead);
+
+ // create a thread for each dropCG
+ DropThread[] dropThreads = new DropThread[3];
+
+ for (int i = 0; i < dropThreads.length; i++) {
+
+ dropThreads[i] = new DropThread(i);
+ }
+
+ // start the threads
+ for (int j = 0; j < dropThreads.length; j++) {
+ dropThreads[j].start();
+ }
+
+ // create read threads
+ ReadThread[] readThreads = new ReadThread[numOfReadThreads];
+
+ for (int i = 0; i < readThreads.length; i++) {
+
+ readThreads[i] = new ReadThread(i, "a, b, c, d, e, f", 1000);
+ }
+
+ // start the threads
+ for (int j = 0; j < readThreads.length; j++) {
+ readThreads[j].start();
+ }
+
+ for (Thread thr : dropThreads) {
+ try {
+ thr.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ for (Thread thr : readThreads) {
+ try {
+ thr.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+ true, true, true, true }, rowsToRead);
+ System.out.println("second dump");
+ BasicTable.dumpInfo(path.toString(), System.out, conf);
+
+ // Now make sure the reader reports zero rows.
+ Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+
+ // delete the table
+ BasicTable.drop(path, conf);
+
+ }
+
+ @Test
+ public void test11() throws IOException, ParseException {
+
+ /*
+ * Tests test open non-existing table.
+ */
+
+ try {
+ new BasicTable.Reader(new Path(path.toString(), "non-existing"), conf);
+ Assert.fail("read none existing table should fail");
+ } catch (Exception e) {
+
+ }
+
+ }
+
+ @Test
+ public void test12() throws IOException, ParseException {
+ /*
+ * Tests API, path is wrong
+ */
+
+ BasicTable.drop(path, conf);
+
+ TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path, true, false);
+ Path wrongPath = new Path(path.toString() + "non-existing");
+ try {
+ BasicTable.dropColumnGroup(wrongPath, conf, "CG0");
+ Assert.fail("should throw excepiton");
+ } catch (Exception e) {
+
+ }
+ BasicTable.drop(path, conf);
+ }
+
+ @Test
+ public void test13() throws IOException, ParseException {
+ /*
+ * Tests API, conf is null
+ */
+
+ Path path1 = new Path(path.toString() + "13");
+ TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path1, true, false);
+ try {
+ BasicTable.dropColumnGroup(path1, null, "CG0");
+ Assert.fail("should throw excepiton");
+ } catch (Exception e) {
+
+ }
+ BasicTable.drop(path1, conf);
+ }
+
+ @Test
+ public void test14() throws IOException, ParseException {
+ /*
+ * Tests API, CG name is empty string
+ */
+
+ Path path1 = new Path(path.toString() + "14");
+ TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path1, true, false);
+ try {
+ BasicTable.dropColumnGroup(path1, conf, "");
+ Assert.fail("should throw excepiton");
+ } catch (Exception e) {
+
+ }
+ BasicTable.drop(path1, conf);
+ }
+
+ @Test
+ public void test15() throws IOException, ParseException {
+ /*
+ * Tests API, CG name is null
+ */
+
+ Path path1 = new Path(path.toString() + "15");
+
+ TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a];[b];[c];[d];[e];[f]", path1, true, false);
+ try {
+ BasicTable.dropColumnGroup(path1, conf, null);
+ Assert.fail("should throw excepiton");
+ } catch (Exception e) {
+
+ }
+ BasicTable.drop(path1, conf);
+ }
+
+ @Test
+ public void test16() throws IOException, ParseException {
+ /*
+ * Tests delete same CG multiple times
+ */
+
+ Path path1 = new Path(path.toString() + "16");
+
+ int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
+ "[a, b]; [c, d]", path1, true, false);
+
+ int rowsToRead = Math.min(10, numRows);
+
+ // normal table.
+ verifyScanner(path1, conf, "a, c, x", new boolean[] { false, false, true },
+ rowsToRead);
+
+ // Now delete ([c, d)
+ BasicTable.dropColumnGroup(path1, conf, "CG1");
+
+ // check various read cases.
+ verifyScanner(path1, conf, "c, a", new boolean[] { true, false },
+ rowsToRead);
+
+ // Now delete ([c, d)again
+ BasicTable.dropColumnGroup(path1, conf, "CG1");
+
+ verifyScanner(path1, conf, "c, a", new boolean[] { true, false },
+ rowsToRead);
+ BasicTable.drop(path1, conf);
+ }
+
+ @Test
+ public void test17() throws IOException, ParseException {
+ /*
+ * test rangesplit
+ */
+ System.out.println("test 17");
+
+ Path path1 = new Path(path.toString() + "17");
+ TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f", "[a,b,c,d,e,f]",
+ path1, true, false);
+
+ BasicTable.dropColumnGroup(path1, conf, "CG0");
+
+ BasicTable.Reader reader = new BasicTable.Reader(path1, conf);
+ reader.setProjection("a, b, c, d, e, f");
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ TableScanner scanner = null;
+ try {
+ scanner = reader.getScanner(splits.get(0), true);
+ } catch (Exception e) {
+ System.out.println("in test 17, getScanner");
+ e.printStackTrace();
+ }
+
+ Tuple RowValue = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getValue(RowValue);
+
+ Assert.assertEquals(null, RowValue.get(0));
+ Assert.assertFalse(scanner.advance());
+ scanner.getValue(RowValue);
+ Assert.assertEquals(null, RowValue.get(0));
+ BasicTable.drop(path1, conf);
+ }
+
+ /**
+ * A thread that performs a DropColumnGroup.
+ */
+ class DropThread extends Thread {
+
+ private int id;
+
+ public DropThread(int id) {
+
+ this.id = id;
+
+ }
+
+ /**
+ * Executes DropColumnGroup.
+ */
+ public void run() {
+ try {
+ System.out.println("Droping CG: " + id);
+ BasicTable.dropColumnGroup(path, conf, "CG" + id);
+ } catch (Exception e) {
+ System.out.println(id + " - error: " + e);
+ }
+ }
+
+ }
+
+ /**
+ * A thread that performs a ReadColumnGroup.
+ */
+ class ReadThread extends Thread {
+
+ private int id;
+ private String projection;
+ private int numRowsToRead;
+
+ public ReadThread(int id, String projection, int numRowsToRead) {
+ this.id = id;
+ this.projection = projection;
+ this.numRowsToRead = numRowsToRead;
+
+ }
+
+ /**
+ * Executes DropColumnGroup.
+ */
+ public void run() {
+ BasicTable.Reader reader = null;
+ try {
+ reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(null, true);
+ Tuple row = TypesUtils.createTuple(reader.getSchema());
+ for (int i = 0; i < numRowsToRead; i++) {
+ scanner.getValue(row);
+ }
+ scanner.advance();
+ scanner.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+}