Author: yanz Date: Wed Feb 24 19:27:31 2010 New Revision: 915941 URL: http://svn.apache.org/viewvc?rev=915941&view=rev Log: Bag field should always contain a tuple type as the field schema in ResourceSchema object converted from Zebra Schema (xuefuz via yanz)
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestLoaderWithCollection.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=915941&r1=915940&r2=915941&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Wed Feb 24 19:27:31 2010 @@ -60,6 +60,8 @@ BUG FIXES + PIG-1256: Bag field should always contain a tuple type as the field schema in ResourceSchema object converted from Zebra Schema (xuefuz via yanz) + PIG-1227: Throw exception if column group meta file is missing for an unsorted table (yanz) PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz) Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java?rev=915941&r1=915940&r2=915941&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/SchemaConverter.java Wed Feb 24 19:27:31 2010 @@ -158,23 +158,35 @@ ColumnSchema cSchema) { ResourceFieldSchema field = new ResourceFieldSchema(); - if( cSchema.getType() == ColumnType.COLLECTION && cSchema.getSchema().getNumColumns() > 1 ) { - field.setType( ColumnType.RECORD.pigDataType() ); - field.setSchema( convertToResourceSchema( cSchema.getSchema() ) ); - } else if( cSchema.getType() ==ColumnType.ANY && cSchema.getName().isEmpty() ) { // For anonymous column + if( cSchema.getType() ==ColumnType.ANY && cSchema.getName().isEmpty() ) { // For anonymous column field.setName( null ); field.setType( DataType.UNKNOWN ); field.setSchema( null ); } else { field.setName( cSchema.getName() ); field.setType( cSchema.getType().pigDataType() ); - if( cSchema.getType() == ColumnType.MAP ) + if( cSchema.getType() == ColumnType.MAP ) { + // Pig doesn't want any schema for a map field. field.setSchema( null ); - else - field.setSchema( convertToResourceSchema( cSchema.getSchema() ) ); + } else { + org.apache.hadoop.zebra.schema.Schema fs = cSchema.getSchema(); + ResourceSchema rs = convertToResourceSchema( fs ); + if( cSchema.getType() == ColumnType.COLLECTION ) { + int count = fs.getNumColumns(); + if( count > 1 || ( count == 1 && fs.getColumn( 0 ).getType() != ColumnType.RECORD ) ) { + // Pig requires a record (tuple) as the schema for a BAG field. + ResourceFieldSchema fieldSchema = new ResourceFieldSchema(); + fieldSchema.setSchema( rs ); + fieldSchema.setType( ColumnType.RECORD.pigDataType() ); + rs = new ResourceSchema(); + rs.setFields( new ResourceFieldSchema[] { fieldSchema } ); + } + } + field.setSchema( rs ); + } } return field; } - + } Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestLoaderWithCollection.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestLoaderWithCollection.java?rev=915941&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestLoaderWithCollection.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestLoaderWithCollection.java Wed Feb 24 19:27:31 2010 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.zebra.pig; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.schema.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestLoaderWithCollection { + protected static PigServer pigServer; + private static Path pathTable; + + @BeforeClass + public static void setUp() throws Exception { + if (System.getProperty("hadoop.log.dir") == null) { + String base = new File(".").getPath(); + System + .setProperty("hadoop.log.dir", new Path(base).toString() + "./logs"); + } + + pigServer = new PigServer(ExecType.LOCAL); + Configuration conf = new Configuration(); + FileSystem fs = LocalFileSystem.get(conf); + + Path pathWorking = fs.getWorkingDirectory(); + pathTable = new Path(pathWorking, "TestCollectionTable"); + // drop any previous tables + BasicTable.drop(pathTable, conf); + + BasicTable.Writer writer = new BasicTable.Writer(pathTable, + "c:collection(a:double)", "[c]", conf); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + + final int numsBatch = 10; + final int numsInserters = 2; + TableInserter[] inserters = new TableInserter[numsInserters]; + for (int i = 0; i < numsInserters; i++) { + inserters[i] = writer.getInserter("ins" + i, false); + } + + for (int b = 0; b < numsBatch; b++) { + for (int i = 0; i < numsInserters; i++) { + TypesUtils.resetTuple(tuple); + + DataBag bagColl = TypesUtils.createBag(); + Schema schColl = schema.getColumn(0).getSchema(); + Tuple tupColl1 = TypesUtils.createTuple(schColl); + Tuple tupColl2 = TypesUtils.createTuple(schColl); + tupColl1.set(0, 3.1415926); + bagColl.add(tupColl1); + tupColl2.set(0, 123.456789); + bagColl.add(tupColl2); + tuple.set(0, bagColl); + + inserters[i].insert(new BytesWritable(("key" + i).getBytes()), tuple); + } + } + for (int i = 0; i < numsInserters; i++) { + inserters[i].close(); + } + writer.close(); + } + + @AfterClass + public static void tearDown() throws Exception { + pigServer.shutdown(); + } + + @Test + public void test() throws ExecException, IOException { + String query = "records = LOAD '" + pathTable.toString() + + "' USING org.apache.hadoop.zebra.pig.TableLoader('c');"; + System.out.println(query); + pigServer.registerQuery(query); + Iterator<Tuple> it = pigServer.openIterator("records"); + while (it.hasNext()) { + Tuple cur = it.next(); + System.out.println(cur); + } + } + +}