Author: rangadi Date: Fri Sep 25 22:29:37 2009 New Revision: 819057 URL: http://svn.apache.org/viewvc?rev=819057&view=rev Log: PIG-949. If an entire map is placed in non default column group, and a specific key placed in another CG, the second CG did not work as expected. (Yan Zhou, Jing Huang (tests) via rangadi)
Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=819057&r1=819056&r2=819057&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Sep 25 22:29:37 2009 @@ -4,6 +4,10 @@ BUG FIXES - PIG-918 Fix infinite loop only columns in first column group are + PIG-918. Fix infinite loop only columns in first column group are specified. (Yan Zhou via rangadi) - + + PIG-949. If an entire map is placed in non default column group, + and a specific key placed in another CG, the second CG did not + work as expected. (Yan Zhou, Jing Huang (tests) via rangadi) + Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=819057&r1=819056&r2=819057&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (original) +++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Fri Sep 25 22:29:37 2009 @@ -334,6 +334,11 @@ mColMap.put(pi.mCGName, cms); } cms.add(pi.mCGIndex); + for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator(); + it.hasNext(); ) + { + cms.add(it.next()); + } } else { HashSet<ColumnMappingEntry> cms = mColMap.get(fs.name); Added: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java?rev=819057&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java (added) +++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestNonDefaultWholeMapSplit.java Fri Sep 25 22:29:37 2009 @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.zebra.io; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.zebra.io.BasicTable; +import org.apache.hadoop.zebra.io.TableInserter; +import org.apache.hadoop.zebra.io.TableScanner; +import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit; +import org.apache.hadoop.zebra.types.ParseException; +import org.apache.hadoop.zebra.types.Schema; +import org.apache.hadoop.zebra.types.TypesUtils; +import org.apache.pig.data.Tuple; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * + * Test projections on complicated column types. + * + */ +public class TestNonDefaultWholeMapSplit { + + final static String STR_SCHEMA = "m1:map(string),m2:map(map(int))"; + final static String STR_STORAGE = "[m1#{a}];[m2#{x|y}]; [m1#{b}, m2#{z}];[m1]"; + private static Configuration conf; + private static Path path; + private static FileSystem fs; + + @BeforeClass + public static void setUpOnce() throws IOException { + conf = new Configuration(); + conf.setInt("table.output.tfile.minBlock.size", 64 * 1024); + conf.setInt("table.input.split.minSize", 64 * 1024); + conf.set("table.output.tfile.compression", "none"); + + RawLocalFileSystem rawLFS = new RawLocalFileSystem(); + fs = new LocalFileSystem(rawLFS); + path = new Path(fs.getWorkingDirectory(), "TestMap"); + fs = path.getFileSystem(conf); + // drop any previous tables + BasicTable.drop(path, conf); + BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA, + STR_STORAGE, false, conf); + writer.finish(); + Schema schema = writer.getSchema(); + Tuple tuple = TypesUtils.createTuple(schema); + BasicTable.Writer writer1 = new BasicTable.Writer(path, conf); + int part = 0; + TableInserter inserter = writer1.getInserter("part" + part, true); + TypesUtils.resetTuple(tuple); + + // add data to row 1 + // m1:map(string) + Map<String, String> m1 = new HashMap<String, String>(); + m1.put("a", "A"); + m1.put("b", "B"); + m1.put("c", "C"); + tuple.set(0, m1); + + // m2:map(map(int)) + HashMap<String, Map> m2 = new HashMap<String, Map>(); + Map<String, Integer> m3 = new HashMap<String, Integer>(); + m3.put("m311", 311); + m3.put("m321", 321); + m3.put("m331", 331); + Map<String, Integer> m4 = new HashMap<String, Integer>(); + m4.put("m411", 411); + m4.put("m421", 421); + m4.put("m431", 431); + m2.put("x", m3); + m2.put("y", m4); + tuple.set(1, m2); + int row = 0; + inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1) + .getBytes()), tuple); + + // row 2 + row++; + TypesUtils.resetTuple(tuple); + m1.clear(); + m2.clear(); + m3.clear(); + m4.clear(); + // m1:map(string) + m1.put("a", "A2"); + m1.put("b2", "B2"); + m1.put("c2", "C2"); + tuple.set(0, m1); + + // m2:map(map(int)) + m3.put("m321", 321); + m3.put("m322", 322); + m3.put("m323", 323); + m2.put("z", m3); + tuple.set(1, m2); + inserter.insert(new BytesWritable(String.format("k%d%d", part + 1, row + 1) + .getBytes()), tuple); + + // finish building table, closing out the inserter, writer, writer1 + inserter.close(); + writer1.finish(); + writer.close(); + } + + @AfterClass + public static void tearDownOnce() throws IOException { + BasicTable.drop(path, conf); + } + + + @Test + public void testRead1() throws IOException, + org.apache.hadoop.zebra.types.ParseException, ParseException { + /* + * read one map + */ + String projection = new String("m1#{a}"); + BasicTable.Reader reader = new BasicTable.Reader(path, conf); + reader.setProjection(projection); + List<RangeSplit> splits = reader.rangeSplit(1); + TableScanner scanner = reader.getScanner(splits.get(0), true); + BytesWritable key = new BytesWritable(); + Tuple RowValue = TypesUtils.createTuple(scanner.getSchema()); + + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k11".getBytes())); + scanner.getValue(RowValue); + System.out.println("read1 : " + RowValue.toString()); + Assert.assertEquals("{a=A}", RowValue.get(0).toString()); + + scanner.advance(); + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k12".getBytes())); + scanner.getValue(RowValue); + System.out.println(RowValue.get(0).toString()); + Assert.assertEquals("{a=A2}", RowValue.get(0).toString()); + + reader.close(); + } + + + @Test + public void testRead2() throws IOException, + ParseException, ParseException { + /* + * read map of map, stitch + */ + String projection2 = new String("m1#{b}, m2#{x|z}"); + BasicTable.Reader reader = new BasicTable.Reader(path, conf); + reader.setProjection(projection2); + List<RangeSplit> splits = reader.rangeSplit(1); + TableScanner scanner = reader.getScanner(splits.get(0), true); + BytesWritable key = new BytesWritable(); + Tuple RowValue = TypesUtils.createTuple(scanner.getSchema()); + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k11".getBytes())); + scanner.getValue(RowValue); + System.out.println("map of map: " + RowValue.toString()); + // map of map: ([b#B],[z#,x#{m311=311, m321=321, m331=331}]) + Assert.assertEquals("B", ((Map) RowValue.get(0)).get("b")); + Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("x")) + .get("m321")); + Assert.assertEquals(311, ((Map) ((Map) RowValue.get(1)).get("x")) + .get("m311")); + Assert.assertEquals(331, ((Map) ((Map) RowValue.get(1)).get("x")) + .get("m331")); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x")) + .get("m341")); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("z"))); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("a"))); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("c"))); + + System.out.println("rowValue.get)1): " + RowValue.get(1).toString()); + // rowValue.get)1): {z=null, x={m311=311, m321=321, m331=331}} + + scanner.advance(); + + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k12".getBytes())); + scanner.getValue(RowValue); + Assert.assertEquals(null, ((Map) RowValue.get(0)).get("b")); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("x"))); + Assert.assertEquals(323, ((Map) ((Map) RowValue.get(1)).get("z")) + .get("m323")); + Assert.assertEquals(322, ((Map) ((Map) RowValue.get(1)).get("z")) + .get("m322")); + Assert.assertEquals(321, ((Map) ((Map) RowValue.get(1)).get("z")) + .get("m321")); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("a"))); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(0)).get("b"))); + Assert.assertEquals(null, ((Map) ((Map) RowValue.get(1)).get("a"))); + + reader.close(); + + } + + + @Test + public void testRead3() throws IOException, + ParseException, ParseException { + /* + * negative , read one map who is non-exist + */ + String projection = new String("m5"); + BasicTable.Reader reader = new BasicTable.Reader(path, conf); + reader.setProjection(projection); + + List<RangeSplit> splits = reader.rangeSplit(1); + TableScanner scanner = reader.getScanner(splits.get(0), true); + BytesWritable key = new BytesWritable(); + Tuple RowValue = TypesUtils.createTuple(scanner.getSchema()); + + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k11".getBytes())); + scanner.getValue(RowValue); + Assert.assertEquals(false, RowValue.isNull()); + Assert.assertEquals(null, RowValue.get(0)); + Assert.assertEquals(1, RowValue.size()); + + scanner.advance(); + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k12".getBytes())); + scanner.getValue(RowValue); + Assert.assertEquals(false, RowValue.isNull()); + Assert.assertEquals(null, RowValue.get(0)); + Assert.assertEquals(1, RowValue.size()); + reader.close(); + } + + + @Test + public void testRead4() throws IOException, + ParseException, ParseException { + /* + * Not exist key for all rows + */ + String projection = new String("m1#{nonexist}"); + BasicTable.Reader reader = new BasicTable.Reader(path, conf); + reader.setProjection(projection); + List<RangeSplit> splits = reader.rangeSplit(1); + TableScanner scanner = reader.getScanner(splits.get(0), true); + BytesWritable key = new BytesWritable(); + Tuple RowValue = TypesUtils.createTuple(scanner.getSchema()); + + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k11".getBytes())); + scanner.getValue(RowValue); + System.out.println("read1 : " + RowValue.toString()); + Assert.assertEquals("{nonexist=null}", RowValue.get(0).toString()); + + scanner.advance(); + scanner.getKey(key); + Assert.assertEquals(key, new BytesWritable("k12".getBytes())); + scanner.getValue(RowValue); + System.out.println(RowValue.get(0).toString()); + Assert.assertEquals("{nonexist=null}", RowValue.get(0).toString()); + + reader.close(); + } +} \ No newline at end of file