Sure thing.  below is the entire source code for storefunc.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the
 * NOTICE file distributed with this work for additional information
regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
"License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of the
License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
 * See the License for the specific language governing permissions and
limitations under the License.
 */
package com.bluekai.analytics.pig.storage;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.MessageFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.StorageUtil;


public class BkMultiStorage extends StoreFunc {


private String outputPath; // User specified output Path
private MessageFormat messageFormat = new MessageFormat("/dev/null");
private List<Integer> splitFieldIndex = new ArrayList<Integer>(); // Index
of the key fields used for path construction
private Set<Integer> skippedFieldIndex = new HashSet<Integer>(); //Index of
the key fields to skip when storing to hdfs
private final String fieldDel; // delimiter of the output record.
private Compression comp; // Compression type of output data.
private RecordWriter<String, Tuple> writer;
private static final TupleFactory tf = TupleFactory.getInstance();
private static int constructed_times = 0;
private static int entred_setLocation = 0;

// Compression types supported by this store
enum Compression {
none, bz2, bz, gz;
};

public BkMultiStorage(String output, String splitFieldsIndex){
this(output, splitFieldsIndex, "");
}

public BkMultiStorage(String output, String splitFieldsIndex, String
skippedFieldsIndex) {
this(output, splitFieldsIndex, skippedFieldsIndex, "none");
}

public BkMultiStorage(String output, String splitFieldsIndex, String
skippedFieldsIndex, String compression) {
this(output, splitFieldsIndex, skippedFieldsIndex, compression, "\\t");
}

//TODO: make this method more rebust with customized split sep
private List<Integer> fieldsParser(String fieldString){
if ("".endsWith(fieldString.trim())){
return new ArrayList<Integer>();
}
List<Integer> output = new ArrayList<Integer>();
for (String part : fieldString.trim().split(",")){
if (part != null || !part.endsWith(""))
output.add(Integer.valueOf(part));
}
return output;
}

/**
 * Constructor
 *
 * @param parentPathStr
 *          Parent output dir path
 * @param splitFieldIndex
 *          key field index
 * @param compression
 *          'bz2', 'bz', 'gz' or 'none'
 * @param fieldDel
 *          Output record field delimiter.
 */

public BkMultiStorage(String output, String splitFieldsIndex, String
skippedFieldsIndex, String compression, String fieldDel) {
this.messageFormat = new MessageFormat(output);
this.splitFieldIndex = fieldsParser(splitFieldsIndex);
this.skippedFieldIndex = new
HashSet<Integer>(fieldsParser(skippedFieldsIndex));
this.fieldDel = fieldDel;
try {
this.comp = (compression == null) ? Compression.none :
Compression.valueOf(compression.toLowerCase());
} catch (IllegalArgumentException e) {
System.err.println("Exception when converting compression string: "+
compression +" to enum. No compression will be used");
this.comp = Compression.none;
}
constructed_times++;
}

//--------------------------------------------------------------------------
// Implementation of StoreFunc

@Override
public void putNext(Tuple tuple) throws IOException {
//construct the output key
//check to see if I should skip this field when write to HDFS
int tupleSize = tuple.size();
Object[] pathStore = new Object[splitFieldIndex.size()];
int startIndex = 0;
for (int index : splitFieldIndex){
if (tupleSize <= index) {
throw new IOException("split field index:" + index + " >= tuple size:" +
tupleSize);
}
pathStore[startIndex++] = String.valueOf(tuple.get(index));
}
Tuple writeTuple;
if (this.skippedFieldIndex.size() == 0){
writeTuple = tuple;
}else{
writeTuple = tf.newTuple();
for (int i = 0; i < tupleSize; i++) {
if (!this.skippedFieldIndex.contains(i))
writeTuple.append(tuple.get(i));
}
}

String outputPath = messageFormat.format(pathStore);
//construct a new tuple or remove fields from tuple that needs to be skipped
//the key should be the path to write
 try {
writer.write(outputPath, writeTuple);
} catch (InterruptedException e) {
throw new IOException(e);
}
}

@SuppressWarnings("unchecked")
@Override
public OutputFormat getOutputFormat() throws IOException {
MultiStorageOutputFormat format = new MultiStorageOutputFormat();
format.setKeyValueSeparator(fieldDel);
return format;
}

@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}

 private String trimPath(String path){
StringBuilder sb = new StringBuilder();
String[] parts = path.split("/");
for(String part : parts){
if (part.contains("{") && part.contains("}")){
break;
}
sb.append(part); sb.append("/");
}
if (sb.lastIndexOf("/") >= sb.length()-1){
sb.deleteCharAt(sb.lastIndexOf("/"));
}
return sb.toString();
}
 @Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set("mapred.textoutputformat.separator", "");;
FileOutputFormat.setOutputPath(job, new Path(location));
if (comp == Compression.bz2 || comp == Compression.bz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job,  BZip2Codec.class);
} else if (comp == Compression.gz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
 }

//--------------------------------------------------------------------------
// Implementation of OutputFormat
// This class handles the details on how to serialize the output from tuple
to disk

public static class MultiStorageOutputFormat extends
TextOutputFormat<String, Tuple> {
private static final NumberFormat NUMBER_FORMAT =
NumberFormat.getInstance();
    static {
    NUMBER_FORMAT.setMinimumIntegerDigits(5);
    NUMBER_FORMAT.setGroupingUsed(false);
}

private String keyValueSeparator = "\\t";
private byte fieldDel = '\t';
 @Override
public void checkOutputSpecs(JobContext job ) throws
FileAlreadyExistsException, IOException{
// overriding super class function to ensure that the output directory is
not checked for existence since we don't want to store anything there
anyway.
}

@Override
public RecordWriter<String, Tuple>  getRecordWriter(TaskAttemptContext
context) throws IOException, InterruptedException {

final TaskAttemptContext ctx = context;

return new RecordWriter<String, Tuple>() {

private final Map<String, MyLineRecordWriter> storeMap =  new
HashMap<String, MyLineRecordWriter>();

private static final int BUFFER_SIZE = 1024;

private ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);

@Override
public void write(String key, Tuple val) throws IOException {

int sz = val.size();
for (int i = 0; i < sz; i++) {

Object field;
try {
field = val.get(i);
} catch (ExecException ee) {
throw ee;
}
StorageUtil.putField(mOut, field);
if (i != sz - 1) {
mOut.write(fieldDel);
}
}
getStore(key).write(null, new Text(mOut.toByteArray()));
mOut.reset();
}

@Override
public void close(TaskAttemptContext context) throws IOException {
for (MyLineRecordWriter out : storeMap.values()) {
out.close(context);
}
}

//TODO: figure out if part is map or reduce part
private MyLineRecordWriter getStore(String path) throws IOException {
return getStore(path, "part");
}

private MyLineRecordWriter getStore(String path, String part_name) throws
IOException {
MyLineRecordWriter store = storeMap.get(path);
if (store == null) {
DataOutputStream os = createOutputStream(path, part_name);
store = new MyLineRecordWriter(os, String.valueOf(fieldDel));
storeMap.put(path, store);
}
return store;
}


private DataOutputStream createOutputStream(String path, String part_name)
throws IOException {
Configuration conf = ctx.getConfiguration();
//This part generates the actual file name based on tuple value
Path workOutputPath =
((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
TaskID taskId = ctx.getTaskAttemptID().getTaskID();
//Path lhs is parent, rhs is child
char m_or_r = taskId.isMap() ? 'm' : 'r';
Path file = new Path(path, part_name + '-'+ m_or_r + '-'+
NUMBER_FORMAT.format(taskId.getId()));
Path output = new Path(workOutputPath, file);
System.err.println("path is " + path );
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(output, false);
return fileOut;
}

};
}

public void setKeyValueSeparator(String sep) {
fieldDel = StorageUtil.parseFieldDel(sep);
}

//------------------------------------------------------------------------
//

protected static class MyLineRecordWriter
extends TextOutputFormat.LineRecordWriter<WritableComparable, Text> {

public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
super(out, keyValueSeparator);
}
}
}

}



On Tue, Nov 8, 2011 at 11:57 AM, Raghu Angadi <[email protected]> wrote:

> The path is certainly case sensitive. Not sure why this file already
> exists. you could post relevant implementation here.
>
> On Tue, Nov 8, 2011 at 10:41 AM, felix gao <[email protected]> wrote:
>
> > Raghu,
> >
> > I change the code to what you sugguested, but I got an exception when i
> try
> > to store.
> > java.io.IOException: File already
> >
> >
> exists:file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000
> > at
> >
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:228)
> > at
> >
> >
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:335)
> > at
> >
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:368)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:484)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:465)
> > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:372)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.createOutputStream(BkMultiStorage.java:325)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:304)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:298)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:285)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:261)
> > at
> >
> >
> com.bluekai.analytics.pig.storage.BkMultiStorage.putNext(BkMultiStorage.java:184)
> > at
> >
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:138)
> > at
> >
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:97)
> > at
> >
> >
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508)
> > at
> >
> >
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
> > at
> >
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:395)
> > at
> >
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:381)
> > at
> >
> >
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:250)
> > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> > at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
> > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
> > at
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
> >
> > where prefix-a is dynamically generated based on my tuple.
> >
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-0/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-1/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-2/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-3/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-4/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-5/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-6/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-7/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-8/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-9/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-A/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-B/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-C/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-D/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-E/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-F/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-G/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-H/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-I/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-J/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-K/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-L/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-M/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-N/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-O/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-P/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Q/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-R/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-S/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-T/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-U/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-V/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-W/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-X/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Y/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Z/part-r-00000
> > final output stores at
> >
> >
> file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000
> >
> > I am wondering if it is because the path is case insensitive?
> >
> >
> >
> > Thanks,
> >
> > Felix
> >
> >
> >
> > On Fri, Nov 4, 2011 at 3:31 PM, Raghu Angadi <[email protected]> wrote:
> >
> > > You need to set output path to
> > > '/Users/felix/Documents/pig/multi_store_output'
> > > in your setStoreLocation().
> > > Alternately for clarity, you could modify your store udf to be more
> like:
> > > store load_log INTO '/Users/felix/Documents/pig/multi_store_output'
> using
> > > MyMultiStorage('ns_{0}/site_{1}', '2,1', '1,2');
> > >
> > > The reason FileOutputFormat needs a real path is that, at run time
> hadoop
> > > actually uses a temporary path then move the output to correct path if
> > the
> > > job succeeds.
> > >
> > > Raghu.
> > >
> > > On Thu, Nov 3, 2011 at 9:45 AM, Dmitriy Ryaboy <[email protected]>
> > wrote:
> > >
> > > > Don't use FileOutputFormat? Or rather, use something that extends it
> > and
> > > > overrides the validation.
> > > >
> > > > On Wed, Nov 2, 2011 at 3:19 PM, felix gao <[email protected]> wrote:
> > > >
> > > > > If you don't call that funciton. Hadoop is going to throw exception
> > for
> > > > not
> > > > > having output set for the job.
> > > > > something like
> > > > > Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output
> > > > > directory not set.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:87)
> > > > >
> > > > > So i have to set it and then somehow delete it after pig completes.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Nov 2, 2011 at 3:00 PM, Ashutosh Chauhan <
> > [email protected]
> > > > > >wrote:
> > > > >
> > > > > > Then, don't call FileOutputFormat.setOutputPath(job, new
> > > > Path(location));
> > > > > > Looks like I am missing something here.
> > > > > >
> > > > > > Ashutosh
> > > > > > On Wed, Nov 2, 2011 at 14:10, felix gao <[email protected]>
> wrote:
> > > > > >
> > > > > > > Ashutosh,
> > > > > > >
> > > > > > > I problem is I don't wan to use that location at all since I am
> > > > > > > constructing the output location based on tuple input. The
> > location
> > > > is
> > > > > > just
> > > > > > > a dummy holder for me to substitute the right parameters
> > > > > > >
> > > > > > > Felix
> > > > > > >
> > > > > > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan <
> > > > > [email protected]
> > > > > > > >wrote:
> > > > > > >
> > > > > > > > Hey Felix,
> > > > > > > >
> > > > > > > > >> The only problem is that in the setStoreLocation function
> we
> > > > have
> > > > > to
> > > > > > > > call
> > > > > > > > >> FileOutputFormat.setOutputPath(job, new Path(location));
> > > > > > > >
> > > > > > > > Cant you massage location to appropriate string you want to?
> > > > > > > >
> > > > > > > > Ashutosh
> > > > > > > >
> > > > > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[email protected]>
> > > wrote:
> > > > > > > >
> > > > > > > > > I have wrote a custom store function that primarily based
> on
> > > the
> > > > > > > > > multi-storage store function.  They way I use it is
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > store load_log INTO
> > > > > > > > >
> > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}'
> > > > > using
> > > > > > > > > MyMultiStorage('2,1', '1,2');
> > > > > > > > > where {0} and {1} will be substituted with the tuple index
> > at 0
> > > > and
> > > > > > > index
> > > > > > > > > at 1.  Everything is fine and all the data is written to
> the
> > > > > correct
> > > > > > > > place.
> > > > > > > > >  The only problem is that in the setStoreLocation function
> we
> > > > have
> > > > > to
> > > > > > > > call
> > > > > > > > > FileOutputFormat.setOutputPath(job, new Path(location)); i
> > have
> > > > > > > > >
> > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}'
> > > as
> > > > > my
> > > > > > > > output
> > > > > > > > > location so there is actually a folder created in my fs
> with
> > > > ns_{0}
> > > > > > > > > and site_{1}.  Is there a way to tell hadoop not to create
> > > those
> > > > > > output
> > > > > > > > > directory?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Felix
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to