Author: gates
Date: Tue Dec 16 09:21:18 2008
New Revision: 727092

URL: http://svn.apache.org/viewvc?rev=727092&view=rev
Log:
PIG-6.  Add support for loading from hbase.

Added:
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
    
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
Modified:
    hadoop/pig/branches/types/CHANGES.txt
    hadoop/pig/branches/types/build.xml

Modified: hadoop/pig/branches/types/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=727092&r1=727091&r2=727092&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Dec 16 09:21:18 2008
@@ -337,3 +337,5 @@
 
        PIG-556: Changed FindQuantiles to report progress.  Fixed issue with 
null
        reporter being passed to EvalFuncs. (gates)
+
+       PIG-6: Add load support from hbase (hustlmsp via gates).

Modified: hadoop/pig/branches/types/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/build.xml?rev=727092&r1=727091&r2=727092&view=diff
==============================================================================
--- hadoop/pig/branches/types/build.xml (original)
+++ hadoop/pig/branches/types/build.xml Tue Dec 16 09:21:18 2008
@@ -45,6 +45,8 @@
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need 
that anymore -->
     <property name="hadoop.jarfile" value="hadoop18.jar" />
+    <property name="hbase.jarfile" value="hbase-0.18.1.jar" />
+    <property name="hbase.test.jarfile" value="hbase-0.18.1-test.jar" />
 
     <!-- javac properties -->
     <property name="javac.debug" value="on" />
@@ -97,6 +99,8 @@
     <!-- setup the classpath -->
     <path id="classpath">
         <fileset file="${lib.dir}/${hadoop.jarfile}" />
+        <fileset file="${lib.dir}/${hbase.jarfile}" />
+        <fileset file="${lib.dir}/${hbase.test.jarfile}" />
         <fileset file="${lib.dir}/javacc.jar" />
         <fileset file="${lib.dir}/jsch-0.1.33.jar" />
         <fileset file="${lib.dir}/junit-4.1.jar" />

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java?rev=727092&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
 Tue Dec 16 09:21:18 2008
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * HBase Slice to load a portion of range of a table. The key range will be
+ * [start, end) Modeled from org.apache.hadoop.hbase.mapred.TableSplit.
+ */
+public class HBaseSlice implements Slice {
+
+    /** A Generated Serial Version UID **/
+    private static final long serialVersionUID = 9035916017187148965L;
+    private static final Log LOG = LogFactory.getLog(HBaseSlice.class);
+
+    // assigned during construction
+    /** Table Name **/
+    private byte[] m_tableName;
+    /** Table Start Row **/
+    private byte[] m_startRow;
+    /** Table End Row **/
+    private byte[] m_endRow;
+    /** Table Region Location **/
+    private String m_regionLocation;
+    /** Input Columns **/
+    private byte[][] m_inputColumns;
+
+    // created as part of init
+    /** The connection to the table in Hbase **/
+    private transient HTable m_table;
+    /** The scanner over the table **/
+    private transient Scanner m_scanner;
+
+    private transient ArrayList<Object> mProtoTuple;
+
+    /**
+     * Record the last processed row, so that we can restart the scanner when 
an
+     * exception happened during scanning a table
+     */
+    private transient byte[] m_lastRow;
+
+    /**
+     * Constructor
+     * 
+     * @param tableName
+     *            table name
+     * @param startRow
+     *            start now, inclusive
+     * @param endRow
+     *            end row, exclusive
+     * @param inputColumns
+     *            input columns
+     * @param location
+     *            region location
+     */
+    public HBaseSlice(byte[] tableName, byte[] startRow, byte[] endRow,
+            byte[][] inputColumns, final String location) {
+        this.m_tableName = tableName;
+        this.m_startRow = startRow;
+        this.m_endRow = endRow;
+        this.m_inputColumns = inputColumns;
+        this.m_regionLocation = location;
+    }
+
+    /** @return table name */
+    public byte[] getTableName() {
+        return this.m_tableName;
+    }
+
+    /** @return starting row key */
+    public byte[] getStartRow() {
+        return this.m_startRow;
+    }
+
+    /** @return end row key */
+    public byte[] getEndRow() {
+        return this.m_endRow;
+    }
+
+    /** @return input columns */
+    public byte[][] getInputColumns() {
+        return this.m_inputColumns;
+    }
+
+    /** @return the region's hostname */
+    public String getRegionLocation() {
+        return this.m_regionLocation;
+    }
+
+    @Override
+    public long getStart() {
+        // Not clear how to obtain this in a table...
+        return 0;
+    }
+
+    @Override
+    public long getLength() {
+        // Not clear how to obtain this in a table...
+        // it seems to be used only for sorting splits
+        return 0;
+    }
+
+    @Override
+    public String[] getLocations() {
+        return new String[] { m_regionLocation };
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        // This should be the ordinal tuple in the range;
+        // not clear how to calculate...
+        return 0;
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        // Depends on the total number of tuples and getPos
+        return 0;
+    }
+
+    @Override
+    public void init(DataStorage store) throws IOException {
+        LOG.info("Init Hbase Slice " + this);
+        HBaseConfiguration conf = new HBaseConfiguration();
+        // connect to the given table
+        m_table = new HTable(conf, m_tableName);
+        // init the scanner
+        init_scanner();
+    }
+
+    /**
+     * Init the table scanner
+     * 
+     * @throws IOException
+     */
+    private void init_scanner() throws IOException {
+        restart(m_startRow);
+        m_lastRow = m_startRow;
+    }
+
+    /**
+     * Restart scanning from survivable exceptions by creating a new scanner.
+     * 
+     * @param startRow
+     *            the start row
+     * @throws IOException
+     */
+    private void restart(byte[] startRow) throws IOException {
+        if ((m_endRow != null) && (m_endRow.length > 0)) {
+            this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow,
+                    m_endRow);
+        } else {
+            this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow);
+        }
+    }
+
+    @Override
+    public boolean next(Tuple value) throws IOException {
+        RowResult result;
+        try {
+            result = this.m_scanner.next();
+        } catch (UnknownScannerException e) {
+            LOG.debug("recovered from " + StringUtils.stringifyException(e));
+            restart(m_lastRow);
+            if (m_lastRow != m_startRow) {
+                this.m_scanner.next(); // skip presumed already mapped row
+            }
+            result = this.m_scanner.next();
+        }
+        boolean hasMore = result != null && result.size() > 0;
+        if (hasMore) {
+            m_lastRow = result.getRow();
+            convertResultToTuple(result, value);
+        }
+        return hasMore;
+    }
+
+    /**
+     * Converte a row result to a tuple
+     * 
+     * @param result
+     *            row result
+     * @param tuple
+     *            tuple
+     */
+    private void convertResultToTuple(RowResult result, Tuple tuple) {
+        if (mProtoTuple == null)
+            mProtoTuple = new ArrayList<Object>();
+
+        Cell cell = null;
+        byte[] value = null;
+        for (byte[] column : m_inputColumns) {
+            cell = result.get(column);
+            if (cell == null || (value = cell.getValue()) == null) {
+                mProtoTuple.add(null);
+            } else {
+                mProtoTuple.add(new DataByteArray(value));
+            }
+        }
+
+        Tuple newT = TupleFactory.getInstance().newTuple(mProtoTuple);
+        mProtoTuple.clear();
+        tuple.reference(newT);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (m_scanner != null) {
+            m_scanner.close();
+            m_scanner = null;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return m_regionLocation + ":" + Bytes.toString(m_startRow) + ","
+                + Bytes.toString(m_endRow);
+    }
+
+}

Added: 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=727092&view=auto
==============================================================================
--- 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 (added)
+++ 
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
 Tue Dec 16 09:21:18 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A <code>Slicer</code> that split the hbase table into {...@link 
HBaseSlice}s.
+ * And a load function will provided to do none load operations, the actually
+ * load operatrions will be done in {...@link HBaseSlice}.
+ */
+public class HBaseStorage extends Utf8StorageConverter implements Slicer,
+        LoadFunc {
+
+    private byte[][] m_cols;
+    private HTable m_table;
+    private HBaseConfiguration m_conf;
+
+    private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+    // HBase Slicer
+    // Creates a slice per region of a specified table.
+
+    /**
+     * Constructor. Construct a HBase Table loader to load the cells of the
+     * provided columns.
+     * 
+     * @param columnList
+     *            columnlist that is a presented string delimited by space.
+     */
+    public HBaseStorage(String columnList) {
+        String[] colNames = columnList.split(" ");
+        m_cols = new byte[colNames.length][];
+        for (int i = 0; i < m_cols.length; i++) {
+            m_cols[i] = Bytes.toBytes(colNames[i]);
+        }
+
+        m_conf = new HBaseConfiguration();
+    }
+
+    @Override
+    public Slice[] slice(DataStorage store, String tablename)
+            throws IOException {
+        validate(store, tablename);
+
+        byte[][] startKeys = m_table.getStartKeys();
+        if (startKeys == null || startKeys.length == 0) {
+            throw new IOException("Expecting at least one region");
+        }
+        if (m_cols == null || m_cols.length == 0) {
+            throw new IOException("Expecting at least one column");
+        }
+        // one region one slice
+        Slice[] slices = new Slice[startKeys.length];
+        for (int i = 0; i < startKeys.length; i++) {
+            String regionLocation = m_table.getRegionLocation(startKeys[i])
+                    .getServerAddress().getHostname();
+            slices[i] = new HBaseSlice(m_table.getTableName(), startKeys[i],
+                    ((i + 1) < startKeys.length) ? startKeys[i + 1]
+                            : HConstants.EMPTY_START_ROW, m_cols,
+                    regionLocation);
+            LOG.info("slice: " + i + "->" + slices[i]);
+        }
+
+        return slices;
+    }
+
+    @Override
+    public void validate(DataStorage store, String tablename)
+            throws IOException {
+        ensureTable(tablename);
+    }
+
+    private void ensureTable(String tablename) throws IOException {
+        if (m_table == null) {
+            m_table = new HTable(m_conf, tablename);
+        }
+    }
+
+    // HBase LoadFunc
+    // It is just a mock class to let the UDF be casted to a LOADFUNC during
+    // parsing.
+    
+    @Override
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
+        // do nothing
+    }
+
+    @Override
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        // do nothing
+        return null;
+    }
+
+    @Override
+    public void fieldsToRead(Schema schema) {
+        // do nothing
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        // do nothing
+        return null;
+    }
+}

Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java?rev=727092&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java 
(added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java 
Tue Dec 16 09:21:18 2008
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/** {...@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
+public class TestHBaseStorage extends TestCase {
+
+    private static final Log LOG =
+        LogFactory.getLog(TestHBaseStorage.class);
+    
+    private MiniCluster cluster = MiniCluster.buildCluster();
+    private HBaseConfiguration conf;
+    private MiniHBaseCluster hbaseCluster;
+    
+    private PigServer pig;
+    
+    final static int NUM_REGIONSERVERS = 1;
+    
+    // Test Table Inforamtions
+    private static final String TESTTABLE = "pigtable";
+    private static final String COLUMNFAMILY = "pig:";
+    private static final String TESTCOLUMN_A = "pig:col_a";
+    private static final String TESTCOLUMN_B = "pig:col_b";
+    private static final String TESTCOLUMN_C = "pig:col_c";
+    private static final HColumnDescriptor family =
+        new HColumnDescriptor(COLUMNFAMILY);
+    private static final int TEST_ROW_COUNT = 100;
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        conf = new HBaseConfiguration(ConfigurationUtil.
+             toConfiguration(cluster.getProperties()));
+        try {
+            hBaseClusterSetup();
+        } catch (Exception e) {
+            if(hbaseCluster != null) {
+                hbaseCluster.shutdown();
+            }
+            throw e;
+        }
+        
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    /**
+     * Actually start the MiniHBase instance.
+     */
+    protected void hBaseClusterSetup() throws Exception {
+      // start the mini cluster
+      hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
+      // opening the META table ensures that cluster is running
+      new HTable(conf, HConstants.META_TABLE_NAME);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // clear the table
+        deleteTable();
+        super.tearDown();
+        try {
+            HConnectionManager.deleteConnectionInfo(conf, true);
+            if (hbaseCluster != null) {
+                try {
+                    hbaseCluster.shutdown();
+                } catch (Exception e) {
+                    LOG.warn("Closing mini hbase cluster", e);
+                }
+            }
+        } catch (Exception e) {
+            LOG.error(e);
+        }
+        pig.shutdown();
+    }
+
+    /**
+     * load from hbase test
+     * @throws IOException
+     * @throws ExecException
+     */
+    @Test
+    public void testLoadFromHBase() throws IOException, ExecException {
+        prepareTable();
+        pig.registerQuery("a = load '" + TESTTABLE + "' using " +
+            "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + 
TESTCOLUMN_A + 
+            " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, 
col_b:int, col_c);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while(it.hasNext()){
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase "+ t);
+            String col_a = ((DataByteArray)t.get(0)).toString();
+            int col_b = (Integer)t.get(1);
+            String col_c = ((DataByteArray)t.get(2)).toString();
+            
+            assertEquals(String.valueOf(count), col_a);
+            assertEquals(count, col_b);
+            assertEquals("TEXT" + count, col_c);
+            
+            count++;
+        }
+        assertEquals(TEST_ROW_COUNT, count);
+        System.err.println("LoadFromHBase done");
+    }
+    
+    /**
+     * Prepare a table in hbase for testing.
+     * 
+     * @throws IOException
+     */
+    private void prepareTable() throws IOException {
+        // define the table schema
+        HTableDescriptor tabledesc = new HTableDescriptor(TESTTABLE);
+        tabledesc.addFamily(family);
+        
+        // create the table
+        HBaseAdmin admin = new HBaseAdmin(conf);
+        if(admin.tableExists(TESTTABLE)) {
+            deleteTable();
+        }
+        admin.createTable(tabledesc);
+        
+        // put some data into table
+        HTable table = new HTable(conf, TESTTABLE);
+        
+        BatchUpdate batchUpdate;
+        
+        for(int i = 0 ; i < TEST_ROW_COUNT ; i++) {
+            String v = Integer.toString(i);
+            batchUpdate = new BatchUpdate(Bytes.toBytes(
+                "00".substring(v.length()) + v));
+            batchUpdate.put(TESTCOLUMN_A, Bytes.toBytes(v));
+            batchUpdate.put(TESTCOLUMN_B, Bytes.toBytes(v));
+            batchUpdate.put(TESTCOLUMN_C, Bytes.toBytes("TEXT" + i));
+            table.commit(batchUpdate);
+        }
+    }
+    
+    private void deleteTable() throws IOException {
+        // delete the table
+        HBaseAdmin admin = new HBaseAdmin(conf);
+        if(admin.tableExists(TESTTABLE)) {
+            admin.disableTable(TESTTABLE);
+            while(admin.isTableEnabled(TESTTABLE)) {
+                try {
+                    Thread.sleep(3000);
+                } catch (InterruptedException e) {
+                    // do nothing.
+                }
+            }
+            admin.deleteTable(TESTTABLE);
+        }
+    }
+
+}


Reply via email to