Sure thing. below is the entire source code for storefunc.
/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and limitations under the License. */ package com.bluekai.analytics.pig.storage; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.text.MessageFormat; import java.text.NumberFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskInputOutputContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.pig.LoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.StoreFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.StorageUtil; public class BkMultiStorage extends StoreFunc { private String outputPath; // User specified output Path private MessageFormat messageFormat = new MessageFormat("/dev/null"); private List<Integer> splitFieldIndex = new ArrayList<Integer>(); // Index of the key fields used for path construction private Set<Integer> skippedFieldIndex = new HashSet<Integer>(); //Index of the key fields to skip when storing to hdfs private final String fieldDel; // delimiter of the output record. private Compression comp; // Compression type of output data. private RecordWriter<String, Tuple> writer; private static final TupleFactory tf = TupleFactory.getInstance(); private static int constructed_times = 0; private static int entred_setLocation = 0; // Compression types supported by this store enum Compression { none, bz2, bz, gz; }; public BkMultiStorage(String output, String splitFieldsIndex){ this(output, splitFieldsIndex, ""); } public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex) { this(output, splitFieldsIndex, skippedFieldsIndex, "none"); } public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex, String compression) { this(output, splitFieldsIndex, skippedFieldsIndex, compression, "\\t"); } //TODO: make this method more rebust with customized split sep private List<Integer> fieldsParser(String fieldString){ if ("".endsWith(fieldString.trim())){ return new ArrayList<Integer>(); } List<Integer> output = new ArrayList<Integer>(); for (String part : fieldString.trim().split(",")){ if (part != null || !part.endsWith("")) output.add(Integer.valueOf(part)); } return output; } /** * Constructor * * @param parentPathStr * Parent output dir path * @param splitFieldIndex * key field index * @param compression * 'bz2', 'bz', 'gz' or 'none' * @param fieldDel * Output record field delimiter. */ public BkMultiStorage(String output, String splitFieldsIndex, String skippedFieldsIndex, String compression, String fieldDel) { this.messageFormat = new MessageFormat(output); this.splitFieldIndex = fieldsParser(splitFieldsIndex); this.skippedFieldIndex = new HashSet<Integer>(fieldsParser(skippedFieldsIndex)); this.fieldDel = fieldDel; try { this.comp = (compression == null) ? Compression.none : Compression.valueOf(compression.toLowerCase()); } catch (IllegalArgumentException e) { System.err.println("Exception when converting compression string: "+ compression +" to enum. No compression will be used"); this.comp = Compression.none; } constructed_times++; } //-------------------------------------------------------------------------- // Implementation of StoreFunc @Override public void putNext(Tuple tuple) throws IOException { //construct the output key //check to see if I should skip this field when write to HDFS int tupleSize = tuple.size(); Object[] pathStore = new Object[splitFieldIndex.size()]; int startIndex = 0; for (int index : splitFieldIndex){ if (tupleSize <= index) { throw new IOException("split field index:" + index + " >= tuple size:" + tupleSize); } pathStore[startIndex++] = String.valueOf(tuple.get(index)); } Tuple writeTuple; if (this.skippedFieldIndex.size() == 0){ writeTuple = tuple; }else{ writeTuple = tf.newTuple(); for (int i = 0; i < tupleSize; i++) { if (!this.skippedFieldIndex.contains(i)) writeTuple.append(tuple.get(i)); } } String outputPath = messageFormat.format(pathStore); //construct a new tuple or remove fields from tuple that needs to be skipped //the key should be the path to write try { writer.write(outputPath, writeTuple); } catch (InterruptedException e) { throw new IOException(e); } } @SuppressWarnings("unchecked") @Override public OutputFormat getOutputFormat() throws IOException { MultiStorageOutputFormat format = new MultiStorageOutputFormat(); format.setKeyValueSeparator(fieldDel); return format; } @SuppressWarnings("unchecked") @Override public void prepareToWrite(RecordWriter writer) throws IOException { this.writer = writer; } private String trimPath(String path){ StringBuilder sb = new StringBuilder(); String[] parts = path.split("/"); for(String part : parts){ if (part.contains("{") && part.contains("}")){ break; } sb.append(part); sb.append("/"); } if (sb.lastIndexOf("/") >= sb.length()-1){ sb.deleteCharAt(sb.lastIndexOf("/")); } return sb.toString(); } @Override public void setStoreLocation(String location, Job job) throws IOException { job.getConfiguration().set("mapred.textoutputformat.separator", "");; FileOutputFormat.setOutputPath(job, new Path(location)); if (comp == Compression.bz2 || comp == Compression.bz) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); } else if (comp == Compression.gz) { FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); } } //-------------------------------------------------------------------------- // Implementation of OutputFormat // This class handles the details on how to serialize the output from tuple to disk public static class MultiStorageOutputFormat extends TextOutputFormat<String, Tuple> { private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); static { NUMBER_FORMAT.setMinimumIntegerDigits(5); NUMBER_FORMAT.setGroupingUsed(false); } private String keyValueSeparator = "\\t"; private byte fieldDel = '\t'; @Override public void checkOutputSpecs(JobContext job ) throws FileAlreadyExistsException, IOException{ // overriding super class function to ensure that the output directory is not checked for existence since we don't want to store anything there anyway. } @Override public RecordWriter<String, Tuple> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { final TaskAttemptContext ctx = context; return new RecordWriter<String, Tuple>() { private final Map<String, MyLineRecordWriter> storeMap = new HashMap<String, MyLineRecordWriter>(); private static final int BUFFER_SIZE = 1024; private ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE); @Override public void write(String key, Tuple val) throws IOException { int sz = val.size(); for (int i = 0; i < sz; i++) { Object field; try { field = val.get(i); } catch (ExecException ee) { throw ee; } StorageUtil.putField(mOut, field); if (i != sz - 1) { mOut.write(fieldDel); } } getStore(key).write(null, new Text(mOut.toByteArray())); mOut.reset(); } @Override public void close(TaskAttemptContext context) throws IOException { for (MyLineRecordWriter out : storeMap.values()) { out.close(context); } } //TODO: figure out if part is map or reduce part private MyLineRecordWriter getStore(String path) throws IOException { return getStore(path, "part"); } private MyLineRecordWriter getStore(String path, String part_name) throws IOException { MyLineRecordWriter store = storeMap.get(path); if (store == null) { DataOutputStream os = createOutputStream(path, part_name); store = new MyLineRecordWriter(os, String.valueOf(fieldDel)); storeMap.put(path, store); } return store; } private DataOutputStream createOutputStream(String path, String part_name) throws IOException { Configuration conf = ctx.getConfiguration(); //This part generates the actual file name based on tuple value Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath(); TaskID taskId = ctx.getTaskAttemptID().getTaskID(); //Path lhs is parent, rhs is child char m_or_r = taskId.isMap() ? 'm' : 'r'; Path file = new Path(path, part_name + '-'+ m_or_r + '-'+ NUMBER_FORMAT.format(taskId.getId())); Path output = new Path(workOutputPath, file); System.err.println("path is " + path ); FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fileOut = fs.create(output, false); return fileOut; } }; } public void setKeyValueSeparator(String sep) { fieldDel = StorageUtil.parseFieldDel(sep); } //------------------------------------------------------------------------ // protected static class MyLineRecordWriter extends TextOutputFormat.LineRecordWriter<WritableComparable, Text> { public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) { super(out, keyValueSeparator); } } } } On Tue, Nov 8, 2011 at 11:57 AM, Raghu Angadi <[email protected]> wrote: > The path is certainly case sensitive. Not sure why this file already > exists. you could post relevant implementation here. > > On Tue, Nov 8, 2011 at 10:41 AM, felix gao <[email protected]> wrote: > > > Raghu, > > > > I change the code to what you sugguested, but I got an exception when i > try > > to store. > > java.io.IOException: File already > > > > > exists:file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000 > > at > > > org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:228) > > at > > > > > org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:335) > > at > > > org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:368) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:484) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:465) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:372) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.createOutputStream(BkMultiStorage.java:325) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:304) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.getStore(BkMultiStorage.java:298) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:285) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage$MultiStorageOutputFormat$1.write(BkMultiStorage.java:261) > > at > > > > > com.bluekai.analytics.pig.storage.BkMultiStorage.putNext(BkMultiStorage.java:184) > > at > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:138) > > at > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:97) > > at > > > > > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:508) > > at > > > > > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > > at > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.processOnePackageOutput(PigMapReduce.java:395) > > at > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:381) > > at > > > > > org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce$Reduce.reduce(PigMapReduce.java:250) > > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > > at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) > > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) > > at > org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216) > > > > where prefix-a is dynamically generated based on my tuple. > > > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-0/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-1/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-2/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-3/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-4/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-5/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-6/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-7/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-8/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-9/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-A/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-B/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-C/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-D/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-E/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-F/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-G/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-H/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-I/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-J/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-K/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-L/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-M/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-N/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-O/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-P/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Q/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-R/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-S/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-T/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-U/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-V/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-W/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-X/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Y/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-Z/part-r-00000 > > final output stores at > > > > > file:/user/bbuda/ids/_temporary/_attempt_local_0001_r_000000_0/prefix-a/part-r-00000 > > > > I am wondering if it is because the path is case insensitive? > > > > > > > > Thanks, > > > > Felix > > > > > > > > On Fri, Nov 4, 2011 at 3:31 PM, Raghu Angadi <[email protected]> wrote: > > > > > You need to set output path to > > > '/Users/felix/Documents/pig/multi_store_output' > > > in your setStoreLocation(). > > > Alternately for clarity, you could modify your store udf to be more > like: > > > store load_log INTO '/Users/felix/Documents/pig/multi_store_output' > using > > > MyMultiStorage('ns_{0}/site_{1}', '2,1', '1,2'); > > > > > > The reason FileOutputFormat needs a real path is that, at run time > hadoop > > > actually uses a temporary path then move the output to correct path if > > the > > > job succeeds. > > > > > > Raghu. > > > > > > On Thu, Nov 3, 2011 at 9:45 AM, Dmitriy Ryaboy <[email protected]> > > wrote: > > > > > > > Don't use FileOutputFormat? Or rather, use something that extends it > > and > > > > overrides the validation. > > > > > > > > On Wed, Nov 2, 2011 at 3:19 PM, felix gao <[email protected]> wrote: > > > > > > > > > If you don't call that funciton. Hadoop is going to throw exception > > for > > > > not > > > > > having output set for the job. > > > > > something like > > > > > Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output > > > > > directory not set. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:120) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:87) > > > > > > > > > > So i have to set it and then somehow delete it after pig completes. > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 2, 2011 at 3:00 PM, Ashutosh Chauhan < > > [email protected] > > > > > >wrote: > > > > > > > > > > > Then, don't call FileOutputFormat.setOutputPath(job, new > > > > Path(location)); > > > > > > Looks like I am missing something here. > > > > > > > > > > > > Ashutosh > > > > > > On Wed, Nov 2, 2011 at 14:10, felix gao <[email protected]> > wrote: > > > > > > > > > > > > > Ashutosh, > > > > > > > > > > > > > > I problem is I don't wan to use that location at all since I am > > > > > > > constructing the output location based on tuple input. The > > location > > > > is > > > > > > just > > > > > > > a dummy holder for me to substitute the right parameters > > > > > > > > > > > > > > Felix > > > > > > > > > > > > > > On Wed, Nov 2, 2011 at 10:47 AM, Ashutosh Chauhan < > > > > > [email protected] > > > > > > > >wrote: > > > > > > > > > > > > > > > Hey Felix, > > > > > > > > > > > > > > > > >> The only problem is that in the setStoreLocation function > we > > > > have > > > > > to > > > > > > > > call > > > > > > > > >> FileOutputFormat.setOutputPath(job, new Path(location)); > > > > > > > > > > > > > > > > Cant you massage location to appropriate string you want to? > > > > > > > > > > > > > > > > Ashutosh > > > > > > > > > > > > > > > > On Tue, Nov 1, 2011 at 18:07, felix gao <[email protected]> > > > wrote: > > > > > > > > > > > > > > > > > I have wrote a custom store function that primarily based > on > > > the > > > > > > > > > multi-storage store function. They way I use it is > > > > > > > > > > > > > > > > > > > > > > > > > > > store load_log INTO > > > > > > > > > > > '/Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' > > > > > using > > > > > > > > > MyMultiStorage('2,1', '1,2'); > > > > > > > > > where {0} and {1} will be substituted with the tuple index > > at 0 > > > > and > > > > > > > index > > > > > > > > > at 1. Everything is fine and all the data is written to > the > > > > > correct > > > > > > > > place. > > > > > > > > > The only problem is that in the setStoreLocation function > we > > > > have > > > > > to > > > > > > > > call > > > > > > > > > FileOutputFormat.setOutputPath(job, new Path(location)); i > > have > > > > > > > > > > > 'Users/felix/Documents/pig/multi_store_output/ns_{0}/site_{1}' > > > as > > > > > my > > > > > > > > output > > > > > > > > > location so there is actually a folder created in my fs > with > > > > ns_{0} > > > > > > > > > and site_{1}. Is there a way to tell hadoop not to create > > > those > > > > > > output > > > > > > > > > directory? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Felix > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
