Author: pradeepkth
Date: Tue Jan 26 21:38:13 2010
New Revision: 903430

URL: http://svn.apache.org/viewvc?rev=903430&view=rev
Log:
PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)

Removed:
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
Modified:
    hadoop/pig/branches/load-store-redesign/CHANGES.txt
    
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java

Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Tue Jan 26 21:38:13 2010
@@ -22,6 +22,8 @@
 
 INCOMPATIBLE CHANGES
 
+PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)
+
 PIG-1148: Move splitable logic from pig latin to InputFormat (zjffdu via
 pradeepkth)
 

Modified: 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
--- 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 (original)
+++ 
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 Tue Jan 26 21:38:13 2010
@@ -16,155 +16,124 @@
  */
 package org.apache.pig.backend.hadoop.hbase;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.Slice;
-import org.apache.pig.Slicer;
-import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 /**
  * A <code>Slicer</code> that split the hbase table into {...@link 
HBaseSlice}s.
  * And a load function will provided to do none load operations, the actually
  * load operatrions will be done in {...@link HBaseSlice}.
  */
-public class HBaseStorage extends LoadFunc implements Slicer {
+public class HBaseStorage extends LoadFunc {
 
-    private byte[][] m_cols;
-    private HTable m_table;
-    private HBaseConfiguration m_conf;
-
-    private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
-
-    // HBase Slicer
-    // Creates a slice per region of a specified table.
-
-    /**
-     * Constructor. Construct a HBase Table loader to load the cells of the
-     * provided columns.
-     * 
-     * @param columnList
-     *            columnlist that is a presented string delimited by space.
-     */
-    public HBaseStorage(String columnList) {
-        String[] colNames = columnList.split(" ");
-        m_cols = new byte[colNames.length][];
-        for (int i = 0; i < m_cols.length; i++) {
-            m_cols[i] = Bytes.toBytes(colNames[i]);
-        }
-
-        m_conf = new HBaseConfiguration();
-    }
-
-    @Override
-    public Slice[] slice(DataStorage store, String tablename)
-            throws IOException {
-        validate(store, tablename);
-
-        byte[][] startKeys = m_table.getStartKeys();
-        if (startKeys == null || startKeys.length == 0) {
-            throw new IOException("Expecting at least one region");
-        }
-        if (m_cols == null || m_cols.length == 0) {
-            throw new IOException("Expecting at least one column");
-        }
-        // one region one slice
-        Slice[] slices = new Slice[startKeys.length];
-        for (int i = 0; i < startKeys.length; i++) {
-            String regionLocation = m_table.getRegionLocation(startKeys[i])
-                    .getServerAddress().getHostname();
-            slices[i] = new HBaseSlice(m_table.getTableName(), startKeys[i],
-                    ((i + 1) < startKeys.length) ? startKeys[i + 1]
-                            : HConstants.EMPTY_START_ROW, m_cols,
-                    regionLocation);
-            LOG.info("slice: " + i + "->" + slices[i]);
-        }
+       private byte[][] m_cols;
+       private HTable m_table;
+       private Configuration m_conf=new Configuration();
+       private RecordReader reader;
+       private Scan scan=new Scan();
+       
+       private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+       // HBase Slicer
+       // Creates a slice per region of a specified table.
+
+       /**
+        * Constructor. Construct a HBase Table loader to load the cells of the
+        * provided columns.
+        * 
+        * @param columnList
+        *            columnlist that is a presented string delimited by space.
+        */
+       public HBaseStorage(String columnList) {
+               String[] colNames = columnList.split(" ");
+               m_cols = new byte[colNames.length][];
+               for (int i = 0; i < m_cols.length; i++) {
+                       m_cols[i] = Bytes.toBytes(colNames[i]);
+                       scan.addColumn(m_cols[i]);
+               }               
+       }
+
+
+       @Override
+       public Tuple getNext() throws IOException {
+               try {
+                       if (reader.nextKeyValue()) {
+                               ImmutableBytesWritable rowKey = 
(ImmutableBytesWritable) reader
+                                               .getCurrentKey();
+                               Result result = (Result) 
reader.getCurrentValue();
+                               Tuple 
tuple=TupleFactory.getInstance().newTuple(m_cols.length);
+                               for (int i=0;i<m_cols.length;++i){
+                                       tuple.set(i, new 
DataByteArray(result.getValue(m_cols[i])));
+                               }
+                               return tuple;
+                       }
+               } catch (InterruptedException e) {
+                       throw new IOException(e);
+               }
+               return null;
+       }
+
+       @Override
+       public InputFormat getInputFormat() {
+               TableInputFormat inputFormat = new TableInputFormat();
+               inputFormat.setConf(m_conf);
+               return inputFormat;
+       }
+
+       @Override
+       public void prepareToRead(RecordReader reader, PigSplit split) {
+               this.reader = reader;
+       }
 
-        return slices;
-    }
-
-    @Override
-    public void validate(DataStorage store, String tablename)
-            throws IOException {
-        ensureTable(tablename);
-    }
-
-    private void ensureTable(String tablename) throws IOException {
-        LOG.info("tablename: "+tablename);
-
-        // We're looking for the right scheme here (actually, we don't
-        // care what the scheme is as long as it is one and it's
-        // different from hdfs and file. If the user specified to use
-        // the multiquery feature and did not specify a scheme we will
-        // have transformed it to an absolute path. In that case we'll
-        // take the last component and guess that's what was
-        // meant. We'll print a warning in that case.
-        int index;
-        if(-1 != (index = tablename.indexOf("://"))) {
-            if (tablename.startsWith("hdfs:") 
-                || tablename.startsWith("file:")) {
-                index = tablename.lastIndexOf("/");
-                if (-1 == index) {
-                    index = tablename.lastIndexOf("\\");
-                }
-
-                if (-1 == index) {
-                    throw new IOException("Got tablename: "+tablename
-                        +". Either turn off multiquery (-no_multiquery)"
-                        +" or specify load path as \"hbase://<tablename>\".");
-                } else {
-                    String in = tablename;
-                    tablename = tablename.substring(index+1);
-                    LOG.warn("Got tablename: "+in+" Assuming you meant table: "
-                             +tablename+". Either turn off multiquery 
(-no_multiquery) "
-                             +"or specify load path as \"hbase://<tablename>\" 
"
-                             +"to avoid this warning.");
-                }
-            } else {
-                tablename = tablename.substring(index+3);
-            }
-        }
-
-        if (m_table == null) {
-            m_table = new HTable(m_conf, tablename);
-        }
-    }
-
-    // HBase LoadFunc
-    // It is just a mock class to let the UDF be casted to a LOADFUNC during
-    // parsing.
-    
-
-    @Override
-    public Tuple getNext() throws IOException {
-        // do nothing
-        return null;
-    }
-
-    @Override
-    public InputFormat getInputFormat() {
-        return null;
-    }
-
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
+       @Override
     public void setLocation(String location, Job job) throws IOException {
-        throw new UnsupportedOperationException();
-    }
+        if (location.startsWith("hbase://")){
+               m_conf.set(TableInputFormat.INPUT_TABLE, location.substring(8));
+        }else{
+               m_conf.set(TableInputFormat.INPUT_TABLE, location);
+        }
+        m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+    }
+
+       @Override
+       public String relativeToAbsolutePath(String location, Path curDir)
+                       throws IOException {
+               return location;
+       }
+       
+       private static String convertScanToString(Scan scan) {
+
+               try {
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       DataOutputStream dos = new DataOutputStream(out);
+                       scan.write(dos);
+                       return Base64.encodeBytes(out.toByteArray());
+               } catch (IOException e) {
+                       LOG.error(e);
+                       return "";
+               }
+
+       }
 }


Reply via email to