Author: pradeepkth
Date: Thu Nov  5 21:44:24 2009
New Revision: 833193

URL: http://svn.apache.org/viewvc?rev=833193&view=rev
Log:
PIG-958: Splitting output data on key field (ankur via pradeepkth)

Added:
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
Modified:
    hadoop/pig/trunk/CHANGES.txt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833193&r1=833192&r2=833193&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov  5 21:44:24 2009
@@ -26,6 +26,8 @@
 
 IMPROVEMENTS
 
+PIG-958: Splitting output data on key field (ankur via pradeepkth)
+
 PIG-1058: FINDBUGS: remaining "Correctness Warnings" (olgan)
 
 PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=833193&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
 Thu Nov  5 21:44:24 2009
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding 
copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the 
"License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License 
at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.tools.bzip2r.CBZip2OutputStream;
+
+/**
+ * The UDF is useful for splitting the output data into a bunch of directories
+ * and files dynamically based on user specified key field in the output tuple.
+ * 
+ * Sample usage: <code>
+ * A = LOAD 'mydata' USING PigStorage() as (a, b, c);
+ * STORE A INTO '/my/home/output' USING MultiStorage('/my/home/output','0', 
'bz2', '\\t');
+ * </code> Parameter details:- ========== <b>/my/home/output </b>(Required) :
+ * The DFS path where output directories and files will be created. <b> 0
+ * </b>(Required) : Index of field whose values should be used to create
+ * directories and files( field 'a' in this case). <b>'bz2' </b>(Optional) : 
The
+ * compression type. Default is 'none'. Supported types are:- 'none', 'gz' and
+ * 'bz2' <b> '\\t' </b>(Optional) : Output field separator.
+ * 
+ * Let 'a1', 'a2' be the unique values of field 'a'. Then output may look like
+ * this
+ * 
+ * /my/home/output/a1/a1-0000 /my/home/output/a1/a1-0001
+ * /my/home/output/a1/a1-0002 ... /my/home/output/a2/a2-0000
+ * /my/home/output/a2/a2-0001 /my/home/output/a2/a2-0002
+ * 
+ * The prefix '0000*' is the task-id of the mapper/reducer task executing this
+ * store. In case user does a GROUP BY on the field followed by MultiStorage(),
+ * then its imperative that all tuples for a particular group will go exactly 
to
+ * 1 reducer. So in the above case for e.g. there will be only 1 file each 
under
+ * 'a1' and 'a2' directories.
+ */
+public class MultiStorage extends Utf8StorageConverter implements StoreFunc {
+
+  // map of all (key-field-values, PigStorage) received by this store
+  private Map<String, PigStorage> storeMap;
+  private List<OutputStream> outStreamList; // list of all open streams
+  private boolean isAbsolute; // Is the user specified output path absolute
+  private String partition; // Reduce partition ID executing this store
+  private Path outputPath; // User specified output Path
+  private Path workOutputPath; // Task specific temporary output path
+  private Compression comp; // Compression type of output data.
+  private int splitFieldIndex = -1; // Index of the key field
+  private String fieldDel; // delimiter of the output record.
+  private FileSystem fs; // Output file system
+
+  // filter for removing hidden files in a listing
+  public static final PathFilter hiddenPathFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  // Compression types supported by this store
+  enum Compression {
+    none, bz2, bz, gz;
+  };
+
+  public MultiStorage(String parentPathStr, String splitFieldIndex) {
+    this(parentPathStr, splitFieldIndex, "none");
+  }
+
+  public MultiStorage(String parentPathStr, String splitFieldIndex,
+      String compression) {
+    this(parentPathStr, splitFieldIndex, compression, "\\t");
+
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param parentPathStr
+   *          Parent output dir path
+   * @param splitFieldIndex
+   *          key field index
+   * @param compression
+   *          'bz2', 'bz', 'gz' or 'none'
+   * @param fieldDel
+   *          Output record field delimiter.
+   */
+  public MultiStorage(String parentPathStr, String splitFieldIndex,
+      String compression, String fieldDel) {
+    this.outputPath = new Path(parentPathStr);
+    this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+    this.fieldDel = fieldDel;
+    this.storeMap = new HashMap<String, PigStorage>();
+    this.outStreamList = new ArrayList<OutputStream>();
+    try {
+      this.comp = (compression == null) ? Compression.none : Compression
+          .valueOf(compression.toLowerCase());
+    } catch (IllegalArgumentException e) {
+      System.err.println("Exception when converting compression string: "
+          + compression + " to enum. No compression will be used");
+      this.comp = Compression.none;
+    }
+  }
+
+  /**
+   * Return the work output path suffixed with the parent output dir name.
+   * 
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  private Path getWorkOutputPath(JobConf conf) throws IOException {
+    Path outPath = (conf != null) ? new Path(FileOutputFormat
+        .getWorkOutputPath(conf), this.outputPath) : this.outputPath;
+    return outPath;
+  }
+
+  /**
+   * Get the partition number of the reduce task in which it is executing.
+   * 
+   * @param conf
+   * @return
+   */
+  private String getPartition(JobConf conf) {
+    int part = (conf != null) ? conf.getInt("mapred.task.partition", -1) : 0;
+    NumberFormat numberFormat = NumberFormat.getInstance();
+    numberFormat.setMinimumIntegerDigits(5);
+    numberFormat.setGroupingUsed(false);
+    return numberFormat.format(part);
+  }
+
+  /**
+   * hack to get the map/reduce task unique ID in which this is running. Also
+   * get the outputPath of the job to be used as base path where field value
+   * specific sub-directories will be created.
+   * 
+   * @throws IOException
+   */
+  private void initJobSpecificParams() throws IOException {
+    this.partition = (this.partition == null) ? 
getPartition(PigMapReduce.sJobConf)
+        : this.partition;
+    // workOutputPath = workOutputPath/outputPath. Later we will remove the
+    // suffix.
+    this.workOutputPath = (this.workOutputPath == null) ? 
getWorkOutputPath(PigMapReduce.sJobConf)
+        : this.workOutputPath;
+    if (this.fs == null) {
+      this.fs = (PigMapReduce.sJobConf == null) ? FileSystem
+          .getLocal(new Configuration()) : FileSystem
+          .get(PigMapReduce.sJobConf);
+    }
+  }
+
+  @Override
+  public void bindTo(OutputStream os) throws IOException {
+    // Nothing to bind here as we will be writing each tuple into a split
+    // based on its schema
+  }
+
+  /**
+   * Create an appropriate output stream for the fieldValue.
+   * 
+   * @param fieldValue
+   * @return
+   * @throws IOException
+   */
+  private OutputStream createOutputStream(String fieldValue) throws 
IOException {
+    Path path = new Path(fieldValue, fieldValue + '-' + partition);
+    Path fieldValueBasedPath = new Path(workOutputPath, path);
+    OutputStream os = null;
+    switch (comp) {
+    case bz:
+    case bz2:
+      os = fs.create(fieldValueBasedPath.suffix(".bz2"), false);
+      os = new CBZip2OutputStream(os);
+      break;
+    case gz:
+      os = fs.create(fieldValueBasedPath.suffix(".gz"), false);
+      os = new GZIPOutputStream(os);
+      break;
+    case none:
+      os = fs.create(fieldValueBasedPath, false);
+    }
+    return os;
+  }
+
+  /**
+   * Retrieve the pig storage corresponding to the field value.
+   * 
+   * @param fieldValue
+   * @return
+   * @throws IOException
+   */
+  private PigStorage getStore(String fieldValue) throws IOException {
+    PigStorage store = storeMap.get(fieldValue);
+    if (store == null) {
+      store = new PigStorage(fieldDel);
+      OutputStream os = createOutputStream(fieldValue);
+      store.bindTo(os);
+      outStreamList.add(os);
+      storeMap.put(fieldValue, store);
+    }
+    return store;
+  }
+
+  @Override
+  public void putNext(Tuple tuple) throws IOException {
+    initJobSpecificParams();
+    if (tuple.size() <= splitFieldIndex) {
+      throw new IOException("split field index:" + this.splitFieldIndex
+          + " >= tuple size:" + tuple.size());
+    }
+    Object field = null;
+    try {
+      field = tuple.get(splitFieldIndex);
+    } catch (ExecException exec) {
+      throw new IOException(exec);
+    }
+    PigStorage store = getStore(String.valueOf(field));
+    store.putNext(tuple);
+  }
+
+  /**
+   * Flush the output streams and call pigStorage.finish() for each pigStorage
+   * object. Clear the map of pigStorage objects and move the final results to
+   * the correct location from the temporary output path since multiquery
+   * implementation might ignore our results.
+   * 
+   * @ throws IOException
+   */
+  @Override
+  public void finish() throws IOException {
+    Collection<PigStorage> pigStores = storeMap.values();
+    for (PigStorage store : pigStores) {
+      store.finish();
+    }
+    storeMap.clear();
+    for (OutputStream os : outStreamList) {
+      os.flush();
+      os.close();
+    }
+    outStreamList.clear();
+    // move the results here
+    if (PigMapReduce.sJobConf != null) {
+      Path rem = FileOutputFormat.getWorkOutputPath(PigMapReduce.sJobConf);
+      String pathToRemove = rem.toUri().getPath() + (!isAbsolute ? "/" : "");
+      moveResults(workOutputPath, pathToRemove);
+    }
+  }
+
+  /**
+   * Moves the files and dir under given path 'p' to the actual path. The API
+   * traverses the workOutputPath recursively and renames the files and
+   * directories by removing 'rem' from their path names
+   * 
+   * @param p
+   *          The
+   * @param rem
+   * @throws IOException
+   */
+  private void moveResults(Path p, String rem) throws IOException {
+    for (FileStatus fstat : fs.listStatus(p, hiddenPathFilter)) {
+      Path src = fstat.getPath();
+      Path dst = new Path(src.toUri().getPath().replace(rem, ""));
+      if (fstat.isDir()) {
+        fs.mkdirs(dst);
+        moveResults(src, rem);
+      } else {
+        fs.rename(src, dst);
+      }
+    }
+  }
+
+  // @Override
+  public Class getStorePreparationClass() throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=833193&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
 Thu Nov  5 21:44:24 2009
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding 
copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the 
"License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License 
at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express or implied.
+ * See the License for the specific language governing permissions and 
limitations under the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestMultiStorage extends TestCase {
+  private static final String INPUT_FILE = "MultiStorageInput.txt";
+
+  private PigServer pigServer;
+  private PigServer pigServerLocal;
+
+  private MiniCluster cluster = MiniCluster.buildCluster();
+
+  public TestMultiStorage() throws ExecException, IOException {
+    pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    pigServerLocal = new PigServer(ExecType.LOCAL);
+  }
+
+  public static final PathFilter hiddenPathFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  private void createFile() throws IOException {
+    PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+    w.println("100\tapple\taaa1");
+    w.println("200\torange\tbbb1");
+    w.println("300\tstrawberry\tccc1");
+
+    w.println("101\tapple\taaa2");
+    w.println("201\torange\tbbb2");
+    w.println("301\tstrawberry\tccc2");
+
+    w.println("102\tapple\taaa3");
+    w.println("202\torange\tbbb3");
+    w.println("302\tstrawberry\tccc3");
+
+    w.close();
+    Util.deleteFile(cluster, INPUT_FILE);
+    Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    createFile();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path localOut = new Path("local-out");
+    Path dummy = new Path("dummy");
+    if (fs.exists(localOut)) {
+      fs.delete(localOut, true);
+    }
+    if (fs.exists(dummy)) {
+      fs.delete(dummy, true);
+    }
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    new File(INPUT_FILE).delete();
+    Util.deleteFile(cluster, INPUT_FILE);
+  }
+
+  enum Mode {
+    local, cluster
+  };
+
+  @Test
+  public void testMultiStorage() throws IOException {
+    final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
+    final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
+        + "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
+    final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+        + "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
+
+    System.out.print("Testing in LOCAL mode: ...");
+    //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+    System.out.println("Succeeded!");
+    
+    System.out.print("Testing in CLUSTER mode: ...");
+    testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
+    System.out.println("Succeeded!");
+    
+    
+  }
+
+  /**
+   * The actual method that run the test in local or cluster mode. 
+   * 
+   * @param pigServer
+   * @param mode
+   * @param queries
+   * @throws IOException
+   */
+  private void testMultiStorage( Mode mode, String outPath,
+      String... queries) throws IOException {
+    PigServer pigServer = (Mode.local == mode) ? this.pigServerLocal : 
this.pigServer;
+    pigServer.setBatchOn();
+    for (String query : queries) {
+      pigServer.registerQuery(query);
+    }
+    pigServer.executeBatch();
+    verifyResults(mode, outPath);
+  }
+
+  /**
+   * Test if records are split into directories corresponding to split field
+   * values
+   * 
+   * @param mode
+   * @throws IOException
+   */
+  private void verifyResults(Mode mode, String outPath) throws IOException {
+    FileSystem fs = (Mode.local == mode ? FileSystem
+        .getLocal(new Configuration()) : cluster.getFileSystem());
+    Path output = new Path(outPath);
+    Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+        && fs.isDirectory(output));
+
+    Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, 
hiddenPathFilter));
+    Assert.assertTrue("Split field dirs not found!", paths != null);
+
+    for (Path path : paths) {
+      String splitField = path.getName();
+      Path[] files = FileUtil.stat2Paths(fs.listStatus(path, 
hiddenPathFilter));
+      Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+          files != null);
+      for (Path filePath : files) {
+        if (fs.isFile(filePath)) {
+          BufferedReader reader = new BufferedReader(new InputStreamReader(fs
+              .open(filePath)));
+          String line = "";
+          while ((line = reader.readLine()) != null) {
+            String[] fields = line.split("\\t");
+            Assert.assertEquals(fields.length, 3);
+            Assert.assertEquals("Unexpected field value in the output record",
+                splitField, fields[1]);
+          }
+          reader.close();
+        }
+      }
+    }
+  }
+}


Reply via email to