Author: pradeepkth
Date: Mon Dec 14 19:26:18 2009
New Revision: 890453
URL: http://svn.apache.org/viewvc?rev=890453&view=rev
Log:
Additional patch for PIG-1090: Support for conversions between Pig Schema and
the new Resource Schema. (rding via pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=890453&r1=890452&r2=890453&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
Mon Dec 14 19:26:18 2009
@@ -36,25 +36,24 @@
// initializing arrays to empty so we don't have to worry about NPEs
// setters won't set to null
- public ResourceFieldSchema[] fields = new ResourceFieldSchema[0];
+ private ResourceFieldSchema[] fields = new ResourceFieldSchema[0];
public enum Order { ASCENDING, DESCENDING }
- public int[] sortKeys = {}; // each entry is an offset into the fields
array.
- public Order[] sortKeyOrders = new Order[0];
-
-
- public int version = 0;
+ private int[] sortKeys = new int[0]; // each entry is an offset into the
fields array.
+ private Order[] sortKeyOrders = new Order[0];
+
+ private int version = 0;
public static class ResourceFieldSchema {
- public String name;
+ private String name;
// values are constants from DataType
- public byte type;
+ private byte type;
- public String description;
+ private String description;
// nested tuples and bags will have their own schema
- public ResourceSchema schema;
+ private ResourceSchema schema;
public ResourceFieldSchema() {
@@ -64,6 +63,14 @@
type = fieldSchema.type;
name = fieldSchema.alias;
description = "autogenerated from Pig Field Schema";
+ if (type == DataType.BAG && fieldSchema.schema != null) { // allow
partial schema
+ List<FieldSchema> slst = fieldSchema.schema.getFields();
+ if (slst.size() != 1 || slst.get(0).type != DataType.TUPLE) {
+ throw new IllegalArgumentException("Invalid Pig schema: " +
+ "bag schema must have tuple as its field.");
+ }
+ }
+ // XXX allow partial schema
if (type == DataType.BAG || type == DataType.TUPLE) {
schema = new ResourceSchema(fieldSchema.schema);
} else {
@@ -82,6 +89,7 @@
public byte getType() {
return type;
}
+
public ResourceFieldSchema setType(byte type) {
this.type = type;
return this;
@@ -90,7 +98,7 @@
public String getDescription() {
return description;
}
-
+
public ResourceFieldSchema setDescription(String description) {
this.description = description;
return this;
@@ -104,6 +112,18 @@
this.schema = schema;
return this;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.name).append(":");
+ if (DataType.isAtomic(this.type)) {
+ sb.append(DataType.findTypeName(this.type));
+ } else {
+ stringifyResourceSchema(sb, this.schema, this.type);
+ }
+ return sb.toString();
+ }
}
@@ -116,8 +136,7 @@
fields = new ResourceFieldSchema[pigSchemaFields.size()];
for (int i=0; i<fields.length; i++) {
fields[i] = new ResourceFieldSchema(pigSchemaFields.get(i));
- }
-
+ }
}
public int getVersion() {
@@ -150,6 +169,7 @@
public int[] getSortKeys() {
return sortKeys;
}
+
public ResourceSchema setSortKeys(int[] sortKeys) {
if (sortKeys != null)
this.sortKeys = Arrays.copyOf(sortKeys, sortKeys.length);
@@ -165,4 +185,71 @@
this.sortKeyOrders = Arrays.copyOf(sortKeyOrders,
sortKeyOrders.length);
return this;
}
+
+ public static boolean equals(ResourceSchema rs1, ResourceSchema rs2) {
+ if (rs1 == null) {
+ return rs2 == null ? true : false;
+ }
+
+ if (rs2 == null) {
+ return false;
+ }
+
+ if (rs1.getVersion() != rs2.getVersion()
+ || !Arrays.equals(rs1.getSortKeys(), rs2.getSortKeys())
+ || !Arrays.equals(rs1.getSortKeyOrders(),
rs2.getSortKeyOrders())) {
+ return false;
+ }
+
+ ResourceFieldSchema[] rfs1 = rs1.getFields();
+ ResourceFieldSchema[] rfs2 = rs1.getFields();
+
+ if (rfs1.length != rfs2.length) return false;
+
+ for (int i=0; i<rfs1.length; i++) {
+ if (!rfs1[i].getName().equals(rfs2[i].getName())
+ || rfs1[i].getType() != rfs2[i].getType()) {
+ return false;
+ }
+ if (!equals(rfs1[i].getSchema(), rfs2[i].getSchema())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("rs:");
+ stringifyResourceSchema(sb, this, DataType.UNKNOWN) ;
+ return sb.toString();
+ }
+
+ private static void stringifyResourceSchema(StringBuilder sb,
+ ResourceSchema rs, byte type) {
+ if (type == DataType.UNKNOWN) {
+ sb.append("<");
+ } else if (type == DataType.BAG) {
+ sb.append("{");
+ } else if (type == DataType.TUPLE) {
+ sb.append("(");
+ }
+
+ for (int i=0; i<rs.getFields().length; i++) {
+ sb.append(rs.getFields()[i].toString());
+ if (i < rs.getFields().length - 1) {
+ sb.append(",");
+ }
+ }
+
+ if (type == DataType.UNKNOWN) {
+ sb.append(">");
+ } else if (type == DataType.BAG) {
+ sb.append("}");
+ } else if (type == DataType.TUPLE) {
+ sb.append(")");
+ }
+ }
}
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=890453&r1=890452&r2=890453&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
Mon Dec 14 19:26:18 2009
@@ -1603,18 +1603,28 @@
public static Schema getPigSchema(ResourceSchema rSchema)
throws FrontendException {
List<FieldSchema> fsList = new ArrayList<FieldSchema>();
- for(ResourceFieldSchema rfs : rSchema.fields) {
- FieldSchema fs = new FieldSchema(rfs.name, rfs.schema == null ?
null:
- getPigSchema(rfs.schema), rfs.type);
+ for(ResourceFieldSchema rfs : rSchema.getFields()) {
+ FieldSchema fs = new FieldSchema(rfs.getName(),
+ rfs.getSchema() == null ?
+ null : getPigSchema(rfs.getSchema()),
rfs.getType());
// check if we have a need to set twoLevelAcccessRequired flag
- if(rfs.type == DataType.BAG) {
- if(fs.schema.size() == 1) {
- FieldSchema innerFs = fs.schema.getField(0);
- if(innerFs.type == DataType.TUPLE && innerFs.schema !=
null) {
- fs.schema.setTwoLevelAccessRequired(true);
+ if(rfs.getType() == DataType.BAG) {
+ if (fs.schema != null) { // allow partial schema
+ if (fs.schema.size() == 1) {
+ FieldSchema innerFs = fs.schema.getField(0);
+ if (innerFs.type != DataType.TUPLE) {
+ throw new FrontendException("Invalide resource
schema: " +
+ "bag schema must have tuple as its
field.");
+ }
+ if (innerFs.schema != null) { // allow partial schema
+ fs.schema.setTwoLevelAccessRequired(true);
+ }
+ } else {
+ throw new FrontendException("Invalide resource schema:
" +
+ "bag schema should have exact one
field.");
}
- }
+ }
}
fsList.add(fs);
}
Modified:
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java?rev=890453&r1=890452&r2=890453&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
(original)
+++
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
Mon Dec 14 19:26:18 2009
@@ -18,9 +18,11 @@
package org.apache.pig.test;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -29,31 +31,197 @@
import org.apache.pig.test.utils.TypeCheckingTestUtil;
import org.junit.Test;
-public class TestResourceSchema extends TestCase {
+public class TestResourceSchema {
/**
* Test that ResourceSchema is correctly created given a
- * pig.Schema and vice versa
- * @throws FrontendException
- * @throws SchemaMergeException
- * @throws ExecException
+ * pig.Schema and vice versa
*/
@Test
- public void testResourceFlatSchemaCreation() throws ExecException,
SchemaMergeException, FrontendException {
+ public void testResourceFlatSchemaCreation()
+ throws ExecException, SchemaMergeException, FrontendException {
String [] aliases ={"f1", "f2"};
byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
Schema origSchema = TypeCheckingTestUtil.genFlatSchema(
aliases,types);
ResourceSchema rsSchema = new ResourceSchema(origSchema);
- assertEquals("num fields", aliases.length, rsSchema.fields.length);
- ResourceSchema.ResourceFieldSchema[] fields = rsSchema.fields;
+ assertEquals("num fields", aliases.length,
rsSchema.getFields().length);
+ ResourceSchema.ResourceFieldSchema[] fields = rsSchema.getFields();
for (int i=0; i<fields.length; i++) {
- assertEquals(fields[i].name, aliases[i]);
- assertEquals(fields[i].type, types[i]);
+ assertEquals(fields[i].getName(), aliases[i]);
+ assertEquals(fields[i].getType(), types[i]);
}
Schema genSchema = Schema.getPigSchema(rsSchema);
- assertTrue("generated schema equals original" ,
Schema.equals(genSchema, origSchema, true, false));
+ assertTrue("generated schema equals original",
+ Schema.equals(genSchema, origSchema, true, false));
}
+ /**
+ * Test that ResourceSchema is correctly created given a
+ * pig.Schema and vice versa
+ */
+ @Test
+ public void testResourceFlatSchemaCreation2()
+ throws ExecException, SchemaMergeException, FrontendException {
+ String [] aliases ={"f1", "f2"};
+ byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
+
+ Schema origSchema = new Schema(
+ new Schema.FieldSchema("t1",
+ new Schema(
+ new Schema.FieldSchema("t0",
+ TypeCheckingTestUtil.genFlatSchema(
+ aliases,types),
+ DataType.TUPLE)),
DataType.BAG));
+
+ ResourceSchema rsSchema = new ResourceSchema(origSchema);
+
+ Schema genSchema = Schema.getPigSchema(rsSchema);
+ assertTrue("generated schema equals original",
+ Schema.equals(genSchema, origSchema, true, false));
+ }
+
+ /**
+ * Test that Pig Schema is correctly created given a
+ * ResourceSchema and vice versa. Test also that
+ * TwoLevelAccess flag is set for Pig Schema when needed.
+ */
+ @Test
+ public void testToPigSchemaWithTwoLevelAccess() throws FrontendException {
+ ResourceFieldSchema[] level0 =
+ new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("fld0").setType(DataType.CHARARRAY),
+ new ResourceFieldSchema()
+ .setName("fld1").setType(DataType.DOUBLE),
+ new ResourceFieldSchema()
+ .setName("fld2").setType(DataType.INTEGER)
+ };
+
+ ResourceSchema rSchema0 = new ResourceSchema()
+ .setFields(level0);
+
+ ResourceFieldSchema[] level1 =
+ new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("t1").setType(DataType.TUPLE)
+ .setSchema(rSchema0)
+ };
+
+ ResourceSchema rSchema1 = new ResourceSchema()
+ .setFields(level1);
+
+ ResourceFieldSchema[] level2 =
+ new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("t2").setType(DataType.BAG)
+ .setSchema(rSchema1)
+ };
+
+ ResourceSchema origSchema = new ResourceSchema()
+ .setFields(level2);
+
+ Schema pSchema = Schema.getPigSchema(origSchema);
+
+ assertTrue(CheckTwoLevelAccess(pSchema));
+
+ assertTrue(ResourceSchema.equals(origSchema, new
ResourceSchema(pSchema)));
+ }
+
+ private boolean CheckTwoLevelAccess(Schema s) {
+ if (s == null) return false;
+ for (Schema.FieldSchema fs : s.getFields()) {
+ if (fs.type == DataType.BAG
+ && fs.schema != null
+ && fs.schema.isTwoLevelAccessRequired()) {
+ return true;
+ }
+ if (CheckTwoLevelAccess(fs.schema)) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Test invalid Resource Schema: multiple fields for a bag
+ */
+ @Test(expected=FrontendException.class)
+ public void testToPigSchemaWithInvalidSchema() throws FrontendException {
+ ResourceFieldSchema[] level0 = new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("fld0").setType(DataType.CHARARRAY),
+ new ResourceFieldSchema()
+ .setName("fld1").setType(DataType.DOUBLE),
+ new ResourceFieldSchema()
+ .setName("fld2").setType(DataType.INTEGER)
+ };
+
+ ResourceSchema rSchema0 = new ResourceSchema()
+ .setFields(level0);
+
+ ResourceFieldSchema[] level2 = new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("t2").setType(DataType.BAG).setSchema(rSchema0)
+ };
+
+ ResourceSchema rSchema2 = new ResourceSchema()
+ .setFields(level2);
+
+ Schema.getPigSchema(rSchema2);
+ }
+ /**
+ * Test invalid Resource Schema: bag without tuple field
+ */
+ @Test(expected=FrontendException.class)
+ public void testToPigSchemaWithInvalidSchema2() throws FrontendException {
+ ResourceFieldSchema[] level0 = new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("fld0").setType(DataType.CHARARRAY)
+ };
+
+ ResourceSchema rSchema0 = new ResourceSchema()
+ .setFields(level0);
+
+ ResourceFieldSchema[] level2 = new ResourceFieldSchema[] {
+ new ResourceFieldSchema()
+ .setName("t2").setType(DataType.BAG).setSchema(rSchema0)
+ };
+
+ ResourceSchema rSchema2 = new ResourceSchema()
+ .setFields(level2);
+
+ Schema.getPigSchema(rSchema2);
+ }
+
+ /**
+ * Test invalid Pig Schema: multiple fields for a bag
+ */
+ @Test(expected=IllegalArgumentException.class)
+ public void testResourceSchemaWithInvalidPigSchema()
+ throws FrontendException {
+ String [] aliases ={"f1", "f2"};
+ byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
+ Schema level0 = TypeCheckingTestUtil.genFlatSchema(
+ aliases,types);
+ Schema.FieldSchema fld0 =
+ new Schema.FieldSchema("f0", level0, DataType.BAG);
+ Schema level1 = new Schema(fld0);
+ new ResourceSchema(level1);
+ }
+
+ /**
+ * Test invalid Pig Schema: bag without tuple field
+ */
+ @Test(expected=IllegalArgumentException.class)
+ public void testResourceSchemaWithInvalidPigSchema2()
+ throws FrontendException {
+ String [] aliases ={"f1"};
+ byte[] types = {DataType.INTEGER};
+ Schema level0 = TypeCheckingTestUtil.genFlatSchema(
+ aliases,types);
+ Schema.FieldSchema fld0 =
+ new Schema.FieldSchema("f0", level0, DataType.BAG);
+ Schema level1 = new Schema(fld0);
+ new ResourceSchema(level1);
+ }
}
\ No newline at end of file