Author: yanz
Date: Tue Apr 20 15:51:38 2010
New Revision: 935968
URL: http://svn.apache.org/viewvc?rev=935968&view=rev
Log:
PIG-1375 Support of multiple Zebra table writing through Pig (chaow via yanz)
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Tue Apr 20 15:51:38 2010
@@ -18,6 +18,8 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ PIG-1375 Support of multiple Zebra table writing through Pig (chaow via
yanz)
+
PIG-1351 Addition of type check when writing to basic table (chaow via
yanz)
PIG-1361 Zebra TableLoader.getSchema() should return the projectionSchema
specified in the constructor of TableLoader instead of pruned proejction by pig
(gauravj via daijy)
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableOutputFormat.java
Tue Apr 20 15:51:38 2010
@@ -36,6 +36,7 @@ import org.apache.hadoop.zebra.io.TableI
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
import org.apache.hadoop.zebra.types.SortInfo;
+import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.tfile.TFile;
import org.apache.pig.data.Tuple;
@@ -153,7 +154,7 @@ public class BasicTableOutputFormat exte
private static final String OUTPUT_COMPARATOR =
"mapreduce.lib.table.output.comparator";
static final String IS_MULTI = "multi";
public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS =
"zebra.output.partitioner.class";
-
+ public static final String ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS =
"zebra.output.partitioner.class.arguments";
/**
* Set the multiple output paths of the BasicTable in JobContext
@@ -164,9 +165,11 @@ public class BasicTableOutputFormat exte
* The comma separated output paths to the tables.
* The path must either not existent, or must be an empty
directory.
* @param theClass
- * Zebra output partitoner class
+ * Zebra output partitioner class
+ *
+ * @deprecated Use {...@link #setMultipleOutputs(JobContext, class<?
extends ZebraOutputPartition>, Path ...)} instead.
+ *
*/
-
public static void setMultipleOutputs(JobContext jobContext, String
commaSeparatedLocations, Class<? extends ZebraOutputPartition> theClass)
throws IOException {
Configuration conf = jobContext.getConfiguration();
@@ -179,18 +182,17 @@ public class BasicTableOutputFormat exte
setZebraOutputPartitionClass(jobContext, theClass);
}
- /**
+ /**
* Set the multiple output paths of the BasicTable in JobContext
*
* @param jobContext
* The JobContext object.
+ * @param theClass
+ * Zebra output partitioner class
* @param paths
* The list of paths
* The path must either not existent, or must be an empty
directory.
- * @param theClass
- * Zebra output partitioner class
*/
-
public static void setMultipleOutputs(JobContext jobContext, Class<?
extends ZebraOutputPartition> theClass, Path... paths)
throws IOException {
Configuration conf = jobContext.getConfiguration();
@@ -209,9 +211,28 @@ public class BasicTableOutputFormat exte
}
conf.setBoolean(IS_MULTI, true);
setZebraOutputPartitionClass(jobContext, theClass);
-
}
-
+
+ /**
+ * Set the multiple output paths of the BasicTable in JobContext
+ *
+ * @param jobContext
+ * The JobContext object.
+ * @param theClass
+ * Zebra output partitioner class
+ * @param arguments
+ * Arguments string to partitioner class
+ * @param paths
+ * The list of paths
+ * The path must either not existent, or must be an empty directory.
+ */
+ public static void setMultipleOutputs(JobContext jobContext, Class<? extends
ZebraOutputPartition> theClass, String arguments, Path... paths)
+ throws IOException {
+ setMultipleOutputs(jobContext, theClass, paths);
+ if (arguments != null) {
+
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS,
arguments);
+ }
+ }
/**
* Set the multiple output paths of the BasicTable in JobContext
@@ -256,7 +277,6 @@ public class BasicTableOutputFormat exte
}
-
private static void setZebraOutputPartitionClass(
JobContext jobContext, Class<? extends
ZebraOutputPartition> theClass) throws IOException {
if (!ZebraOutputPartition.class.isAssignableFrom(theClass))
@@ -264,7 +284,6 @@ public class BasicTableOutputFormat exte
jobContext.getConfiguration().set(ZEBRA_OUTPUT_PARTITIONER_CLASS,
theClass.getName());
}
-
public static Class<? extends ZebraOutputPartition>
getZebraOutputPartitionClass(JobContext jobContext) throws IOException {
Configuration conf = jobContext.getConfiguration();
@@ -287,8 +306,6 @@ public class BasicTableOutputFormat exte
}
-
-
/**
* Set the output path of the BasicTable in JobContext
*
@@ -392,7 +409,6 @@ public class BasicTableOutputFormat exte
}
-
/**
* Generates a BytesWritable key for the input key
* using keygenerate provided. Sort Key(s) are used to generate this
object
@@ -409,9 +425,6 @@ public class BasicTableOutputFormat exte
return kg.generateKey(t);
}
-
-
-
/**
* Set the table storage hint in JobContext, should be called after
setSchema is
* called.
@@ -737,7 +750,13 @@ class TableOutputCommitter extends Outpu
class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
private final TableInserter inserter[];
private org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition op =
null;
-
+
+ // for Pig's call path;
+ final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
+ private int[] sortColIndices = null;
+ private KeyGenerator builder = null;
+ private Tuple t = null;
+
public TableRecordWriter(String path, TaskAttemptContext context)
throws IOException {
Configuration conf = context.getConfiguration();
if(conf.getBoolean(BasicTableOutputFormat.IS_MULTI, false) ==
true) {
@@ -753,8 +772,36 @@ class TableRecordWriter extends RecordWr
BasicTable.Writer writer =
new BasicTable.Writer(paths[i], conf);
this.inserter[i] = writer.getInserter( inserterName,
true, checkType);
+
+ // Set up SortInfo related stuff only once;
+ if (i == 0) {
+ if (writer.getSortInfo() != null)
+ {
+ sortColIndices = writer.getSortInfo().getSortIndices();
+ SortInfo sortInfo = writer.getSortInfo();
+ String[] sortColNames = sortInfo.getSortColumnNames();
+ org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
+
+ byte[] types = new byte[sortColNames.length];
+
+ for(int j =0 ; j < sortColNames.length; ++j){
+ types[j] =
schema.getColumn(sortColNames[j]).getType().pigDataType();
+ }
+ t = TypesUtils.createTuple(sortColNames.length);
+ builder = makeKeyBuilder(types);
+ }
+ }
}
}
+
+ private KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
@Override
public void close(TaskAttemptContext context) throws IOException {
@@ -765,6 +812,17 @@ class TableRecordWriter extends RecordWr
@Override
public void write(BytesWritable key, Tuple value) throws IOException {
+ if (key == null) {
+ if (sortColIndices != null) { // If this is a sorted table and key is
null (Pig's call path);
+ for (int i =0; i < sortColIndices.length;++i) {
+ t.set(i, value.get(sortColIndices[i]));
+ }
+ key = builder.generateKey(t);
+ } else { // for unsorted table;
+ key = KEY0;
+ }
+ }
+
if(op != null ) {
int idx = op.getOutputPartition(key, value);
if(idx < 0 || (idx >= inserter.length)) {
@@ -775,6 +833,5 @@ class TableRecordWriter extends RecordWr
inserter[0].insert(key, value);
}
}
-
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableStorer.java
Tue Apr 20 15:51:38 2010
@@ -26,20 +26,14 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.zebra.pig.comparator.ComparatorExpr;
-import org.apache.hadoop.zebra.pig.comparator.ExprUtils;
-import org.apache.hadoop.zebra.pig.comparator.KeyGenerator;
-import org.apache.hadoop.zebra.io.BasicTable;
-import org.apache.hadoop.zebra.io.TableInserter;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.mapreduce.ZebraSchema;
+import org.apache.hadoop.zebra.mapreduce.ZebraSortInfo;
+import org.apache.hadoop.zebra.mapreduce.ZebraStorageHint;
import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.SortInfo;
-import org.apache.hadoop.zebra.types.TypesUtils;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
@@ -57,282 +51,180 @@ public class TableStorer extends StoreFu
private static final String UDFCONTEXT_SORT_INFO =
"zebra.UDFContext.sortInfo";
private static final String UDFCONTEXT_OUTPUT_CHECKTYPE =
"zebra.UDFContext.checkType";
- static final String OUTPUT_STORAGEHINT =
"mapreduce.lib.table.output.storagehint";
- static final String OUTPUT_SCHEMA = "mapreduce.lib.table.output.schema";
- static final String OUTPUT_PATH = "mapreduce.lib.table.output.dir";
- static final String SORT_INFO = "mapreduce.lib.table.sort.info";
static final String OUTPUT_CHECKTYPE =
"mapreduce.lib.table.output.checktype";
private String storageHintString = null;
private String udfContextSignature = null;
- private TableRecordWriter tableRecordWriter = null;
+ private RecordWriter<BytesWritable, Tuple> tableRecordWriter = null;
+ private String partitionClassString = null;
+ Class<? extends ZebraOutputPartition> partitionClass = null;
+ private String partitionClassArgumentsString = null;
public TableStorer() {
}
- public TableStorer(String storageHintStr) throws ParseException,
IOException {
- storageHintString = storageHintStr;
+ public TableStorer(String storageHintString) {
+ this.storageHintString = storageHintString;
}
+ public TableStorer(String storageHintString, String partitionClassString) {
+ this.storageHintString = storageHintString;
+ this.partitionClassString = partitionClassString;
+ }
+
+ public TableStorer(String storageHintString, String partitionClassString,
String partitionClassArgumentsString) {
+ this.storageHintString = storageHintString;
+ this.partitionClassString = partitionClassString;
+ this.partitionClassArgumentsString = partitionClassArgumentsString;
+ }
+
+
@Override
public void putNext(Tuple tuple) throws IOException {
+ try {
tableRecordWriter.write( null, tuple );
+ } catch (InterruptedException e) {
+ throw new IOException(e.getMessage());
+ }
}
@Override
public void checkSchema(ResourceSchema schema) throws IOException {
- // Get schemaStr and sortColumnNames from the given schema. In the
process, we
- // also validate the schema and sorting info.
- ResourceSchema.Order[] orders = schema.getSortKeyOrders();
- boolean descending = false;
- for (ResourceSchema.Order order : orders)
+ // Get schemaStr and sortColumnNames from the given schema. In the
process, we
+ // also validate the schema and sorting info.
+ ResourceSchema.Order[] orders = schema.getSortKeyOrders();
+ boolean descending = false;
+ for (ResourceSchema.Order order : orders)
+ {
+ if (order == ResourceSchema.Order.DESCENDING)
{
- if (order == ResourceSchema.Order.DESCENDING)
- {
- Log LOG = LogFactory.getLog(TableStorer.class);
- LOG.warn("Sorting in descending order is not supported by Zebra
and the table will be unsorted.");
- descending = true;
- break;
- }
- }
- StringBuilder sortColumnNames = new StringBuilder();
- if (!descending) {
- ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
- int[] index = schema.getSortKeys();
+ Log LOG = LogFactory.getLog(TableStorer.class);
+ LOG.warn("Sorting in descending order is not supported by Zebra and
the table will be unsorted.");
+ descending = true;
+ break;
+ }
+ }
+ StringBuilder sortColumnNames = new StringBuilder();
+ if (!descending) {
+ ResourceSchema.ResourceFieldSchema[] fields = schema.getFields();
+ int[] index = schema.getSortKeys();
+
+ for( int i = 0; i< index.length; i++ ) {
+ ResourceFieldSchema field = fields[index[i]];
+ String name = field.getName();
+ if( name == null )
+ throw new IOException("Zebra does not support column positional
reference yet");
+ if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
+ throw new IOException( "Field [" + name + "] is not of simple
type as required for a sort column now." );
+ if( i > 0 )
+ sortColumnNames.append( "," );
+ sortColumnNames.append( name );
+ }
+ }
+
+ // Convert resource schema to zebra schema
+ org.apache.hadoop.zebra.schema.Schema zebraSchema;
+ try {
+ zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
+ } catch (ParseException ex) {
+ throw new IOException("Exception thrown from SchemaConverter: " +
ex.getMessage() );
+ }
+
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA, zebraSchema.toString()
);
+ properties.setProperty( UDFCONTEXT_SORT_INFO, sortColumnNames.toString()
);
- for( int i = 0; i< index.length; i++ ) {
- ResourceFieldSchema field = fields[index[i]];
- String name = field.getName();
- if( name == null )
- throw new IOException("Zebra does not support column
positional reference yet");
- if( !org.apache.pig.data.DataType.isAtomic( field.getType() ) )
- throw new IOException( "Field [" + name + "] is not of simple
type as required for a sort column now." );
- if( i > 0 )
- sortColumnNames.append( "," );
- sortColumnNames.append( name );
- }
- }
-
- // Convert resource schema to zebra schema
- org.apache.hadoop.zebra.schema.Schema zebraSchema;
- try {
- zebraSchema = SchemaConverter.convertFromResourceSchema( schema );
- } catch (ParseException ex) {
- throw new IOException("Exception thrown from SchemaConverter: " +
ex.getMessage() );
- }
-
- Properties properties = UDFContext.getUDFContext().getUDFProperties(
- this.getClass(), new String[]{ udfContextSignature } );
- properties.setProperty( UDFCONTEXT_OUTPUT_SCHEMA,
zebraSchema.toString() );
- properties.setProperty( UDFCONTEXT_SORT_INFO,
sortColumnNames.toString() );
-
- // This is to turn off type check for potential corner cases - for
internal use only;
- if (System.getenv("zebra_output_checktype") != null &&
System.getenv("zebra_output_checktype").equals("no")) {
- properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no");
- }
+ // This is to turn off type check for potential corner cases - for
internal use only;
+ if (System.getenv("zebra_output_checktype") != null &&
System.getenv("zebra_output_checktype").equals("no")) {
+ properties.setProperty( UDFCONTEXT_OUTPUT_CHECKTYPE, "no");
+ }
}
@SuppressWarnings("unchecked")
@Override
public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
throws IOException {
- return new TableOutputFormat();
+ return new BasicTableOutputFormat();
}
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(RecordWriter writer)
throws IOException {
- tableRecordWriter = (TableRecordWriter)writer;
- if( tableRecordWriter == null ) {
- throw new IOException( "Invalid type of writer. Expected type:
TableRecordWriter." );
- }
+ tableRecordWriter = writer;
+ if( tableRecordWriter == null ) {
+ throw new IOException( "Invalid type of writer. Expected type:
TableRecordWriter." );
+ }
}
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
- return LoadFunc.getAbsolutePath( location, curDir );
+ return LoadFunc.getAbsolutePath( location, curDir );
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- conf.set( OUTPUT_STORAGEHINT, storageHintString );
- conf.set( OUTPUT_PATH, location );
-
- // Get schema string and sorting info from UDFContext and re-store
them to
- // job config.
- Properties properties = UDFContext.getUDFContext().getUDFProperties(
- this.getClass(), new String[]{ udfContextSignature } );
- conf.set( OUTPUT_SCHEMA, properties.getProperty(
UDFCONTEXT_OUTPUT_SCHEMA ) );
- conf.set( SORT_INFO, properties.getProperty( UDFCONTEXT_SORT_INFO ) );
-
- // Get checktype information from UDFContext and re-store it to job
config;
- if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null &&
properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
- conf.setBoolean(OUTPUT_CHECKTYPE, false);
+ Configuration conf = job.getConfiguration();
+
+ String[] outputs = location.split(",");
+
+ if (outputs.length == 1) {
+ BasicTableOutputFormat.setOutputPath(job, new Path(location));
+ } else if (outputs.length > 1) {
+ if (partitionClass == null) {
+ try {
+ partitionClass = (Class<? extends ZebraOutputPartition>)
conf.getClassByName(partitionClassString);
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
}
+
+ Path[] paths = new Path[outputs.length];
+ for (int i=0; i<paths.length; i++) {
+ paths[i] = new Path(outputs[i]);
+ }
+
+ BasicTableOutputFormat.setMultipleOutputs(job, partitionClass,
partitionClassArgumentsString, paths);
+ } else {
+ throw new IOException( "Invalid location : " + location);
+ }
+
+ // Get schema string and sorting info from UDFContext and re-store them
to
+ // job config.
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ this.getClass(), new String[]{ udfContextSignature } );
+ ZebraSchema zSchema =
ZebraSchema.createZebraSchema(properties.getProperty(UDFCONTEXT_OUTPUT_SCHEMA));
+ ZebraSortInfo zSortInfo =
ZebraSortInfo.createZebraSortInfo(properties.getProperty(UDFCONTEXT_SORT_INFO),
null);
+ ZebraStorageHint zStorageHint =
ZebraStorageHint.createZebraStorageHint(storageHintString);
+ try {
+ BasicTableOutputFormat.setStorageInfo(job, zSchema, zStorageHint,
zSortInfo);
+ } catch (ParseException e) {
+ throw new IOException("Invalid storage info: " + e.getMessage());
+ }
+
+ // Get checktype information from UDFContext and re-store it to job
config;
+ if (properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE) != null &&
properties.getProperty(UDFCONTEXT_OUTPUT_CHECKTYPE).equals("no")) {
+ conf.setBoolean(OUTPUT_CHECKTYPE, false);
+ }
}
@Override
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
- // no-op. We do close at cleanupJob().
- BasicTable.Writer write = new BasicTable.Writer( new Path( location ),
- job.getConfiguration());
- write.close();
+ //TODO: This is temporary - we will do close at cleanupJob() when
OutputCommitter is ready.
+ BasicTableOutputFormat.close(job);
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
- udfContextSignature = signature;
+ udfContextSignature = signature;
}
@Override
public void storeStatistics(ResourceStatistics stats, String location,
Job job) throws IOException {
- // no-op
- }
-
-}
-
-/**
- *
- * Table OutputFormat
- *
- */
-class TableOutputFormat extends OutputFormat<BytesWritable, Tuple> {
- @Override
- public void checkOutputSpecs(JobContext job) throws IOException,
InterruptedException {
- Configuration conf = job.getConfiguration();
- String location = conf.get( TableStorer.OUTPUT_PATH );
- String schemaStr = conf.get( TableStorer.OUTPUT_SCHEMA );
- String storageHint = conf.get( TableStorer.OUTPUT_STORAGEHINT );
- String sortColumnNames = conf.get( TableStorer.SORT_INFO );
-
- BasicTable.Writer writer = new BasicTable.Writer( new Path( location
),
- schemaStr, storageHint, sortColumnNames, null, conf );
- writer.finish();
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext taContext)
- throws IOException, InterruptedException {
- return new TableOutputCommitter() ;
- }
-
- @Override
- public org.apache.hadoop.mapreduce.RecordWriter<BytesWritable, Tuple>
getRecordWriter(
- TaskAttemptContext taContext) throws IOException,
InterruptedException {
- return new TableRecordWriter( taContext );
- }
-
-}
-
-// TODO: make corresponding changes for commit and cleanup. Currently, no-ops.
-class TableOutputCommitter extends OutputCommitter {
- @Override
- public void abortTask(TaskAttemptContext taContext) throws IOException {
- // TODO Auto-generated method stub
- }
-
- @Override
- public void cleanupJob(JobContext jobContext) throws IOException {
-// Configuration conf = jobContext.getConfiguration();
-// String location = conf.get( TableStorer.OUTPUT_PATH );
-// BasicTable.Writer write = new BasicTable.Writer( new Path( location
), conf );
-// write.close();
+ // no-op
}
-
- @Override
- public void commitTask(TaskAttemptContext taContext) throws IOException {
- int i = 0;
- i++;
- // TODO Auto-generated method stub
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext taContext) throws
IOException {
- return false;
- }
-
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setupTask(TaskAttemptContext taContext) throws IOException {
- // TODO Auto-generated method stub
- }
-
-}
-
-/**
- *
- * Table RecordWriter
- *
- */
-class TableRecordWriter extends RecordWriter<BytesWritable, Tuple> {
- final private BytesWritable KEY0 = new BytesWritable(new byte[0]);
- private BasicTable.Writer writer;
- private TableInserter inserter;
- private int[] sortColIndices = null;
- KeyGenerator builder;
- Tuple t;
-
- public TableRecordWriter(TaskAttemptContext taContext) throws IOException {
- Configuration conf = taContext.getConfiguration();
-
- String path = conf.get(TableStorer.OUTPUT_PATH);
- writer = new BasicTable.Writer( new Path( path ), conf );
-
- if (writer.getSortInfo() != null)
- {
- sortColIndices = writer.getSortInfo().getSortIndices();
- SortInfo sortInfo = writer.getSortInfo();
- String[] sortColNames = sortInfo.getSortColumnNames();
- org.apache.hadoop.zebra.schema.Schema schema = writer.getSchema();
-
- byte[] types = new byte[sortColNames.length];
- for(int i =0 ; i < sortColNames.length; ++i){
- types[i] =
schema.getColumn(sortColNames[i]).getType().pigDataType();
- }
- t = TypesUtils.createTuple(sortColNames.length);
- builder = makeKeyBuilder(types);
- }
-
- boolean checkType = conf.getBoolean(TableStorer.OUTPUT_CHECKTYPE,
true);
- inserter = writer.getInserter("patition-" +
taContext.getTaskAttemptID().getTaskID().getId(), false, checkType);
- }
-
- @Override
- public void close(TaskAttemptContext taContext) throws IOException {
- inserter.close();
- writer.finish();
- }
-
- private KeyGenerator makeKeyBuilder(byte[] elems) {
- ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
- for (int i = 0; i < elems.length; ++i) {
- exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
- }
- return new KeyGenerator(ExprUtils.tupleComparator(exprs));
- }
-
- @Override
- public void write(BytesWritable key, Tuple value) throws IOException {
- System.out.println( "Tuple: " + value.toDelimitedString(",") );
- if (sortColIndices != null) {
- for(int i =0; i < sortColIndices.length;++i) {
- t.set(i, value.get(sortColIndices[i]));
- }
- key = builder.generateKey(t);
- } else if (key == null) {
- key = KEY0;
- }
- inserter.insert(key, value);
- }
-
}
Modified:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java?rev=935968&r1=935967&r2=935968&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMixedType1.java
Tue Apr 20 15:51:38 2010
@@ -55,7 +55,7 @@ public class TestMixedType1 extends Base
System.out.println("ONCE SETUP !! ---------");
init();
- path = getTableFullPath("");
+ path = getTableFullPath("TestMixedType1");
removeDir(path);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java?rev=935968&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs1.java
Tue Apr 20 15:51:38 2010
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.Iterator;
+import junit.framework.Assert;
+
+/**
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ *
+ */
+public class TestMultipleOutputs1 extends BaseTestCase implements Tool {
+ static String inputPath;
+ static String inputFileName = "multi-input.txt";
+ public static String sortKey = null;
+
+ @Before
+ public void setUp() throws Exception {
+ init();
+
+ inputPath = getTableFullPath(inputFileName).toString();
+
+ writeToFile(inputPath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mode == TestMode.local) {
+ pigServer.shutdown();
+ }
+ }
+
+ public static void writeToFile (String inputFile) throws IOException{
+ if (mode == TestMode.local) {
+ FileWriter fstream = new FileWriter(inputFile);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write("us\t2\n");
+ out.write("japan\t2\n");
+ out.write("india\t4\n");
+ out.write("us\t2\n");
+ out.write("japan\t1\n");
+ out.write("india\t3\n");
+ out.write("nouse\t5\n");
+ out.write("nowhere\t4\n");
+ out.close();
+ }
+
+ if (mode == TestMode.cluster) {
+ FSDataOutputStream fout = fs.create(new Path (inputFile));
+ fout.writeBytes("us\t2\n");
+ fout.writeBytes("japan\t2\n");
+ fout.writeBytes("india\t4\n");
+ fout.writeBytes("us\t2\n");
+ fout.writeBytes("japan\t1\n");
+ fout.writeBytes("india\t3\n");
+ fout.writeBytes("nouse\t5\n");
+ fout.writeBytes("nowhere\t4\n");
+ fout.close();
+ }
+ }
+
+ // test no sort key;
+ @Test
+ public void test1() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ // Load data;
+ String query = "records = LOAD '" + inputPath + "' as (word:chararray,
count:int);";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+
+ // Store using multiple outputs;
+ String outputPaths = "us,india,japan";
+ removeDir(getTableFullPath("us"));
+ removeDir(getTableFullPath("india"));
+ removeDir(getTableFullPath("japan"));
+
+ query = "store records into '" + outputPaths + "' using
org.apache.hadoop.zebra.pig.TableStorer('[word,count]'," +
+
"'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass');";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ // Validate results;
+ query = "records = LOAD '" + "us"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ int count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 3) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ } else if (count == 4) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 4);
+
+ query = "records = LOAD '" + "india"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+
+ query = "records = LOAD '" + "japan"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+ }
+
+ //Test sort key on word;
+ @Test
+ public void test2() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ // Load data;
+ String query = "a = LOAD '" + inputPath + "' as (word:chararray,
count:int);";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ query = "records = order a by word;";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+
+ // Store using multiple outputs;
+ String outputPaths = "us,india,japan";
+ removeDir(getTableFullPath("us"));
+ removeDir(getTableFullPath("india"));
+ removeDir(getTableFullPath("japan"));
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ outputPaths,
+ TableStorer.class.getCanonicalName() +
+ "('[word,count]',
'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')");
+
+ Assert.assertNull(pigJob.getException());
+
+ // Validate results;
+ query = "records = LOAD '" + "us"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ int count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ } else if (count == 3) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 4) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 4);
+
+ query = "records = LOAD '" + "india"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+
+ query = "records = LOAD '" + "japan"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+ }
+
+ //Test sort key on word and count;
+ @Test
+ public void test3() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ // Load data;
+ String query = "a = LOAD '" + inputPath + "' as (word:chararray,
count:int);";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ query = "records = order a by word, count;";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+
+ // Store using multiple outputs;
+ String outputPaths = "us,india,japan";
+ removeDir(getTableFullPath("us"));
+ removeDir(getTableFullPath("india"));
+ removeDir(getTableFullPath("japan"));
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ outputPaths,
+ TableStorer.class.getCanonicalName() +
+ "('[word,count]',
'org.apache.hadoop.zebra.pig.TestMultipleOutputs1$OutputPartitionerClass')");
+
+ Assert.assertNull(pigJob.getException());
+
+ // Validate results;
+ query = "records = LOAD '" + "us"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ int count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ } else if (count == 3) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 4) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 4);
+
+ query = "records = LOAD '" + "india"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+
+ query = "records = LOAD '" + "japan"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+ }
+
+ //Negative test case: invalid partition class;
+ @Test (expected = IOException.class)
+ public void testNegative1() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ // Load data;
+ String query = "a = LOAD '" + inputPath + "' as (word:chararray,
count:int);";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ query = "records = order a by word, count;";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+
+ // Store using multiple outputs;
+ String outputPaths = "us,india,japan";
+ removeDir(getTableFullPath("us"));
+ removeDir(getTableFullPath("india"));
+ removeDir(getTableFullPath("japan"));
+ pigServer
+ .store(
+ "records",
+ outputPaths,
+ TableStorer.class.getCanonicalName() +
+ "('[word,count]',
'org.apache.hadoop.zebra.pig.notexistingclass')");
+ }
+
+ public static class OutputPartitionerClass extends ZebraOutputPartition {
+
+ @Override
+ public int getOutputPartition(BytesWritable key, Tuple value) {
+ String reg = null;
+ try {
+ reg = (String) (value.get(0));
+ } catch (Exception e) {
+ //
+ }
+
+ if (reg.equals("us"))
+ return 0;
+ if (reg.equals("india"))
+ return 1;
+ if (reg.equals("japan"))
+ return 2;
+
+ return 0;
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ TestMultipleOutputs1 test = new TestMultipleOutputs1();
+
+ test.setUp();
+ test.test1();
+ test.tearDown();
+
+ test.setUp();
+ test.test2();
+ test.tearDown();
+
+ test.setUp();
+ test.test3();
+ test.tearDown();
+
+ test.setUp();
+ test.testNegative1();
+ test.tearDown();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ conf = new Configuration();
+
+ int res = ToolRunner.run(conf, new TestMultipleOutputs1(), args);
+ System.out.println("PASS");
+ System.exit(res);
+ }
+}
Added:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java?rev=935968&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
(added)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMultipleOutputs2.java
Tue Apr 20 15:51:38 2010
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.pig;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.zebra.BaseTestCase;
+import org.apache.hadoop.zebra.mapreduce.BasicTableOutputFormat;
+import org.apache.hadoop.zebra.mapreduce.ZebraOutputPartition;
+import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.Iterator;
+import junit.framework.Assert;
+
+/**
+ * Assume the input files contain rows of word and count, separated by a space:
+ *
+ * <pre>
+ * us 2
+ * japan 2
+ * india 4
+ * us 2
+ * japan 1
+ * india 3
+ * nouse 5
+ * nowhere 4
+ *
+ */
+public class TestMultipleOutputs2 extends BaseTestCase implements Tool {
+ static String inputPath;
+ static String inputFileName = "multi-input.txt";
+ public static String sortKey = null;
+
+ @Before
+ public void setUp() throws Exception {
+ init();
+
+ inputPath = getTableFullPath(inputFileName).toString();
+
+ writeToFile(inputPath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (mode == TestMode.local) {
+ pigServer.shutdown();
+ }
+ }
+
+ public static void writeToFile (String inputFile) throws IOException{
+ if (mode == TestMode.local) {
+ FileWriter fstream = new FileWriter(inputFile);
+ BufferedWriter out = new BufferedWriter(fstream);
+ out.write("us\t2\n");
+ out.write("japan\t2\n");
+ out.write("india\t4\n");
+ out.write("us\t2\n");
+ out.write("japan\t1\n");
+ out.write("india\t3\n");
+ out.write("nouse\t5\n");
+ out.write("nowhere\t4\n");
+ out.close();
+ }
+
+ if (mode == TestMode.cluster) {
+ FSDataOutputStream fout = fs.create(new Path (inputFile));
+ fout.writeBytes("us\t2\n");
+ fout.writeBytes("japan\t2\n");
+ fout.writeBytes("india\t4\n");
+ fout.writeBytes("us\t2\n");
+ fout.writeBytes("japan\t1\n");
+ fout.writeBytes("india\t3\n");
+ fout.writeBytes("nouse\t5\n");
+ fout.writeBytes("nowhere\t4\n");
+ fout.close();
+ }
+ }
+
+ @Test
+ public void test1() throws ParseException, IOException,
+ org.apache.hadoop.zebra.parser.ParseException, Exception {
+ // Load data;
+ String query = "records = LOAD '" + inputPath + "' as (word:chararray,
count:int);";
+ System.out.println("query = " + query);
+ pigServer.registerQuery(query);
+
+ Iterator<Tuple> it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ Tuple cur = it.next();
+ System.out.println(cur);
+ }
+
+ // Store using multiple outputs;
+ String outputPaths = "us_0,india_1,japan_2";
+ removeDir(getTableFullPath("us_0"));
+ removeDir(getTableFullPath("india_1"));
+ removeDir(getTableFullPath("japan_2"));
+ ExecJob pigJob = pigServer
+ .store(
+ "records",
+ outputPaths,
+ TableStorer.class.getCanonicalName() +
+ "('[word,count]',
'org.apache.hadoop.zebra.pig.TestMultipleOutputs2$OutputPartitionerClass',
'us,india,japan')");
+
+ Assert.assertNull(pigJob.getException());
+
+ // Validate results;
+ query = "records = LOAD '" + "us_0"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ int count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("us", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 3) {
+ Assert.assertEquals("nouse", RowValue.get(0));
+ Assert.assertEquals(5, RowValue.get(1));
+ } else if (count == 4) {
+ Assert.assertEquals("nowhere", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 4);
+
+ query = "records = LOAD '" + "india_1"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(4, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("india", RowValue.get(0));
+ Assert.assertEquals(3, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+
+ query = "records = LOAD '" + "japan_2"
+ + "' USING org.apache.hadoop.zebra.pig.TableLoader();";
+
+ count = 0;
+ System.out.println(query);
+ pigServer.registerQuery(query);
+ it = pigServer.openIterator("records");
+ while (it.hasNext()) {
+ count ++;
+ Tuple RowValue = it.next();
+ System.out.println(RowValue);
+ if (count == 1) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(2, RowValue.get(1));
+ } else if (count == 2) {
+ Assert.assertEquals("japan", RowValue.get(0));
+ Assert.assertEquals(1, RowValue.get(1));
+ }
+ }
+ Assert.assertEquals(count, 2);
+ }
+
+ public static class OutputPartitionerClass extends ZebraOutputPartition {
+
+ @Override
+ public int getOutputPartition(BytesWritable key, Tuple value) {
+ String reg = null;
+
+ try {
+ reg = (String) (value.get(0));
+ } catch (Exception e) {
+ //
+ }
+
+ String argumentsString =
conf.get(BasicTableOutputFormat.ZEBRA_OUTPUT_PARTITIONER_CLASS_ARGUMENTS);
+
+ String[] arguments = argumentsString.split(",");
+
+ if (reg.equals(arguments[0]))
+ return 0;
+ if (reg.equals(arguments[1]))
+ return 1;
+ if (reg.equals(arguments[2]))
+ return 2;
+
+ return 0;
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ TestMultipleOutputs2 test = new TestMultipleOutputs2();
+
+ test.setUp();
+ test.test1();
+ test.tearDown();
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ conf = new Configuration();
+
+ int res = ToolRunner.run(conf, new TestMultipleOutputs2(), args);
+ System.out.println("PASS");
+ System.exit(res);
+ }
+}