Author: yanz
Date: Thu Dec 24 00:37:39 2009
New Revision: 893660
URL: http://svn.apache.org/viewvc?rev=893660&view=rev
Log:
PIG-1136: Support of map split on hash keys with leading underscore (xuefuz via
yanz)
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=893660&r1=893659&r2=893660&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Thu Dec 24 00:37:39 2009
@@ -12,6 +12,8 @@
IMPROVEMENTS
+ PIG-1136 Support of map split on hash keys with leading underscore (xuefuz
via yanz)
+
PIG-1125 Map/Reduce API Changes (Chao Wang via yanz)
PIG-1104 Streaming Support (Chao Wang via yanz)
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=893660&r1=893659&r2=893660&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
Thu Dec 24 00:37:39 2009
@@ -74,7 +74,8 @@
| <#SPECIALCHAR : ["_"] >
| <#FSSPECIALCHAR: ["-", ":", "/"]>
| <#SCOPEOP : "::">
-| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* (
<SCOPEOP> ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* >
+| <IDENTIFIER: ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )* (
<SCOPEOP> ( <LETTER> )+ ( <DIGIT> | <LETTER> | <SPECIALCHAR> )*)* >
+| <MAPKEYIDENTIFIER: ( <LETTER> | <SPECIALCHAR> )+ ( <DIGIT> | <LETTER> |
<SPECIALCHAR> )* ( <SCOPEOP> ( <LETTER> )+ ( <DIGIT> | <LETTER> |
<SPECIALCHAR> )*)* >
| <SHORT : (<OCTAL>){3} >
}
@@ -299,9 +300,9 @@
}
{
(
- LOOKAHEAD(SchemaRecord()) fs = SchemaRecord(mSchema, name, colIndex)
-| LOOKAHEAD(SchemaMap()) fs = SchemaMap(mSchema, name, colIndex)
-| LOOKAHEAD(AtomSchema()) fs = AtomSchema(mSchema, name, colIndex)
+ LOOKAHEAD(2) fs = SchemaRecord(mSchema, name, colIndex)
+| LOOKAHEAD(2) fs = SchemaMap(mSchema, name, colIndex)
+| LOOKAHEAD(2) fs = AtomSchema(mSchema, name, colIndex)
)
{
return fs;
@@ -517,9 +518,18 @@
HashSet<String> result = new HashSet<String>();
}
{
- t = <IDENTIFIER> { result.add(t.image); }
- ("|" t = <IDENTIFIER> { result.add(t.image); })*
+ t = hashKey() { result.add(t.image); }
+ ("|" t = hashKey() { result.add(t.image); })*
{
return result;
}
}
+
+Token hashKey() :
+{
+ Token t;
+}
+{
+ ( t = <MAPKEYIDENTIFIER> | t = <IDENTIFIER> )
+ { return t; }
+}
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java?rev=893660&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnName.java
Thu Dec 24 00:37:39 2009
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.io.BasicTable.Reader.RangeSplit;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ *
+ * Test conventions for column names. Specifically, '_' is allowed as leading
character for map keys,
+ * but it's disallowed for other fields.
+ *
+ */
+public class TestColumnName {
+ final static String STR_SCHEMA =
+ "f1:bool, r:record(f11:int, f12:long), m:map(string),
c:collection(f13:double, f14:float, f15:bytes)";
+ final static String STR_STORAGE = "[r.f12, f1, m#{b}]; [m#{_a}, r.f11]";
+
+ final static String INVALID_STR_SCHEMA =
+ "_f1:bool, _r:record(f11:int, _f12:long), _m:map(string),
_c:collection(_f13:double, _f14:float, _f15:bytes)";
+ final static String INVALID_STR_STORAGE = "[_r.f12, _f1, _m#{b}];
[_m#{_a}, _r.f11]";
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem fs = new LocalFileSystem( new
RawLocalFileSystem() );
+ private static Path path = new Path( fs.getWorkingDirectory(),
"TestColumnName" );
+ static {
+ conf.setInt("table.output.tfile.minBlock.size", 64 * 1024);
+ conf.setInt("table.input.split.minSize", 64 * 1024);
+ conf.set("table.output.tfile.compression", "none");
+ }
+
+ @BeforeClass
+ public static void setUp() throws IOException {
+ // drop any previous tables
+ BasicTable.drop( path, conf );
+
+ BasicTable.Writer writer = new BasicTable.Writer( path,
STR_SCHEMA, STR_STORAGE, conf );
+ writer.finish();
+
+ Schema schema = writer.getSchema();
+ Tuple tuple = TypesUtils.createTuple( schema );
+
+ BasicTable.Writer writer1 = new BasicTable.Writer(path, conf);
+ int part = 0;
+ TableInserter inserter = writer1.getInserter("part" + part,
true);
+ TypesUtils.resetTuple(tuple);
+
+ tuple.set(0, true);
+
+ Tuple tupRecord;
+ try {
+ tupRecord =
TypesUtils.createTuple(schema.getColumnSchema("r")
+ .getSchema());
+ } catch (ParseException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ tupRecord.set(0, 1);
+ tupRecord.set(1, 1001L);
+ tuple.set(1, tupRecord);
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("_a", "x");
+ map.put("b", "y");
+ map.put("c", "z");
+ tuple.set(2, map);
+
+ DataBag bagColl = TypesUtils.createBag();
+ Schema schColl = schema.getColumn(3).getSchema();
+ Tuple tupColl1 = TypesUtils.createTuple(schColl);
+ Tuple tupColl2 = TypesUtils.createTuple(schColl);
+ byte[] abs1 = new byte[3];
+ byte[] abs2 = new byte[4];
+ tupColl1.set(0, 3.1415926);
+ tupColl1.set(1, 1.6);
+ abs1[0] = 11;
+ abs1[1] = 12;
+ abs1[2] = 13;
+ tupColl1.set(2, new DataByteArray(abs1));
+ bagColl.add(tupColl1);
+ tupColl2.set(0, 123.456789);
+ tupColl2.set(1, 100);
+ abs2[0] = 21;
+ abs2[1] = 22;
+ abs2[2] = 23;
+ abs2[3] = 24;
+ tupColl2.set(2, new DataByteArray(abs2));
+ bagColl.add(tupColl2);
+ tuple.set(3, bagColl);
+
+ int row = 0;
+ inserter.insert(new BytesWritable(String.format("k%d%d", part +
1, row + 1)
+ .getBytes()), tuple);
+ inserter.close();
+ writer1.finish();
+
+ writer.close();
+ }
+
+ @AfterClass
+ public static void tearDownOnce() throws IOException {
+ }
+
+ @Test
+ public void testInvalidCase() throws IOException {
+ Path p = new Path( fs.getWorkingDirectory(),
"TestColumnNameInvalid" );
+ BasicTable.drop( p, conf );
+
+ try {
+ BasicTable.Writer writer = new BasicTable.Writer( p,
INVALID_STR_SCHEMA, INVALID_STR_STORAGE, conf );
+ writer.finish();
+ } catch(IOException ex) {
+ // Do nothing. This is expected.
+ return;
+ }
+
+ Assert.assertTrue( false ); // Test failure.
+ }
+
+ @Test
+ public void testRead() throws IOException, ParseException {
+ String projection = new String("f1, m#{_a|b}, r, m#{c}");
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ // long totalBytes = reader.getStatus().getSize();
+
+ List<RangeSplit> splits = reader.rangeSplit(1);
+ reader.close();
+ reader = new BasicTable.Reader(path, conf);
+ reader.setProjection(projection);
+ TableScanner scanner = reader.getScanner(splits.get(0), true);
+ BytesWritable key = new BytesWritable();
+ Tuple value = TypesUtils.createTuple(scanner.getSchema());
+
+ scanner.getKey(key);
+ Assert.assertEquals(key, new BytesWritable("k11".getBytes()));
+ scanner.getValue(value);
+
+ Tuple recordTuple = (Tuple) value.get(2);
+ Assert.assertEquals(1, recordTuple.get(0));
+ Assert.assertEquals(1001L, recordTuple.get(1));
+ Assert.assertEquals(true, value.get(0));
+
+ HashMap<String, Object> mapval = (HashMap<String, Object>)
value.get(1);
+ Assert.assertEquals("x", mapval.get("_a"));
+ Assert.assertEquals("y", mapval.get("b"));
+ Assert.assertEquals(null, mapval.get("c"));
+ mapval = (HashMap<String, Object>) value.get(3);
+ Assert.assertEquals("z", mapval.get("c"));
+ Assert.assertEquals(null, mapval.get("_a"));
+ Assert.assertEquals(null, mapval.get("b"));
+ reader.close();
+ }
+
+ @Test
+ public void testProjectionParsing() throws IOException, ParseException {
+ String projection = new String( "f1, m#{_a}, _r, m#{c}, m" );
+ BasicTable.Reader reader = new BasicTable.Reader( path, conf );
+ try {
+ reader.setProjection( projection );
+ reader.close();
+ } catch(ParseException ex) {
+ // Expected.
+ return;
+ }
+
+ Assert.assertTrue( false );
+ }
+
+}