Author: gates
Date: Tue Dec 16 09:21:18 2008
New Revision: 727092
URL: http://svn.apache.org/viewvc?rev=727092&view=rev
Log:
PIG-6. Add support for loading from hbase.
Added:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/build.xml
Modified: hadoop/pig/branches/types/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=727092&r1=727091&r2=727092&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Tue Dec 16 09:21:18 2008
@@ -337,3 +337,5 @@
PIG-556: Changed FindQuantiles to report progress. Fixed issue with
null
reporter being passed to EvalFuncs. (gates)
+
+ PIG-6: Add load support from hbase (hustlmsp via gates).
Modified: hadoop/pig/branches/types/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/build.xml?rev=727092&r1=727091&r2=727092&view=diff
==============================================================================
--- hadoop/pig/branches/types/build.xml (original)
+++ hadoop/pig/branches/types/build.xml Tue Dec 16 09:21:18 2008
@@ -45,6 +45,8 @@
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need
that anymore -->
<property name="hadoop.jarfile" value="hadoop18.jar" />
+ <property name="hbase.jarfile" value="hbase-0.18.1.jar" />
+ <property name="hbase.test.jarfile" value="hbase-0.18.1-test.jar" />
<!-- javac properties -->
<property name="javac.debug" value="on" />
@@ -97,6 +99,8 @@
<!-- setup the classpath -->
<path id="classpath">
<fileset file="${lib.dir}/${hadoop.jarfile}" />
+ <fileset file="${lib.dir}/${hbase.jarfile}" />
+ <fileset file="${lib.dir}/${hbase.test.jarfile}" />
<fileset file="${lib.dir}/javacc.jar" />
<fileset file="${lib.dir}/jsch-0.1.33.jar" />
<fileset file="${lib.dir}/junit-4.1.jar" />
Added:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java?rev=727092&view=auto
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
(added)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
Tue Dec 16 09:21:18 2008
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * HBase Slice to load a portion of range of a table. The key range will be
+ * [start, end) Modeled from org.apache.hadoop.hbase.mapred.TableSplit.
+ */
+public class HBaseSlice implements Slice {
+
+ /** A Generated Serial Version UID **/
+ private static final long serialVersionUID = 9035916017187148965L;
+ private static final Log LOG = LogFactory.getLog(HBaseSlice.class);
+
+ // assigned during construction
+ /** Table Name **/
+ private byte[] m_tableName;
+ /** Table Start Row **/
+ private byte[] m_startRow;
+ /** Table End Row **/
+ private byte[] m_endRow;
+ /** Table Region Location **/
+ private String m_regionLocation;
+ /** Input Columns **/
+ private byte[][] m_inputColumns;
+
+ // created as part of init
+ /** The connection to the table in Hbase **/
+ private transient HTable m_table;
+ /** The scanner over the table **/
+ private transient Scanner m_scanner;
+
+ private transient ArrayList<Object> mProtoTuple;
+
+ /**
+ * Record the last processed row, so that we can restart the scanner when
an
+ * exception happened during scanning a table
+ */
+ private transient byte[] m_lastRow;
+
+ /**
+ * Constructor
+ *
+ * @param tableName
+ * table name
+ * @param startRow
+ * start now, inclusive
+ * @param endRow
+ * end row, exclusive
+ * @param inputColumns
+ * input columns
+ * @param location
+ * region location
+ */
+ public HBaseSlice(byte[] tableName, byte[] startRow, byte[] endRow,
+ byte[][] inputColumns, final String location) {
+ this.m_tableName = tableName;
+ this.m_startRow = startRow;
+ this.m_endRow = endRow;
+ this.m_inputColumns = inputColumns;
+ this.m_regionLocation = location;
+ }
+
+ /** @return table name */
+ public byte[] getTableName() {
+ return this.m_tableName;
+ }
+
+ /** @return starting row key */
+ public byte[] getStartRow() {
+ return this.m_startRow;
+ }
+
+ /** @return end row key */
+ public byte[] getEndRow() {
+ return this.m_endRow;
+ }
+
+ /** @return input columns */
+ public byte[][] getInputColumns() {
+ return this.m_inputColumns;
+ }
+
+ /** @return the region's hostname */
+ public String getRegionLocation() {
+ return this.m_regionLocation;
+ }
+
+ @Override
+ public long getStart() {
+ // Not clear how to obtain this in a table...
+ return 0;
+ }
+
+ @Override
+ public long getLength() {
+ // Not clear how to obtain this in a table...
+ // it seems to be used only for sorting splits
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[] { m_regionLocation };
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ // Depends on the total number of tuples and getPos
+ return 0;
+ }
+
+ @Override
+ public void init(DataStorage store) throws IOException {
+ LOG.info("Init Hbase Slice " + this);
+ HBaseConfiguration conf = new HBaseConfiguration();
+ // connect to the given table
+ m_table = new HTable(conf, m_tableName);
+ // init the scanner
+ init_scanner();
+ }
+
+ /**
+ * Init the table scanner
+ *
+ * @throws IOException
+ */
+ private void init_scanner() throws IOException {
+ restart(m_startRow);
+ m_lastRow = m_startRow;
+ }
+
+ /**
+ * Restart scanning from survivable exceptions by creating a new scanner.
+ *
+ * @param startRow
+ * the start row
+ * @throws IOException
+ */
+ private void restart(byte[] startRow) throws IOException {
+ if ((m_endRow != null) && (m_endRow.length > 0)) {
+ this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow,
+ m_endRow);
+ } else {
+ this.m_scanner = this.m_table.getScanner(m_inputColumns, startRow);
+ }
+ }
+
+ @Override
+ public boolean next(Tuple value) throws IOException {
+ RowResult result;
+ try {
+ result = this.m_scanner.next();
+ } catch (UnknownScannerException e) {
+ LOG.debug("recovered from " + StringUtils.stringifyException(e));
+ restart(m_lastRow);
+ if (m_lastRow != m_startRow) {
+ this.m_scanner.next(); // skip presumed already mapped row
+ }
+ result = this.m_scanner.next();
+ }
+ boolean hasMore = result != null && result.size() > 0;
+ if (hasMore) {
+ m_lastRow = result.getRow();
+ convertResultToTuple(result, value);
+ }
+ return hasMore;
+ }
+
+ /**
+ * Converte a row result to a tuple
+ *
+ * @param result
+ * row result
+ * @param tuple
+ * tuple
+ */
+ private void convertResultToTuple(RowResult result, Tuple tuple) {
+ if (mProtoTuple == null)
+ mProtoTuple = new ArrayList<Object>();
+
+ Cell cell = null;
+ byte[] value = null;
+ for (byte[] column : m_inputColumns) {
+ cell = result.get(column);
+ if (cell == null || (value = cell.getValue()) == null) {
+ mProtoTuple.add(null);
+ } else {
+ mProtoTuple.add(new DataByteArray(value));
+ }
+ }
+
+ Tuple newT = TupleFactory.getInstance().newTuple(mProtoTuple);
+ mProtoTuple.clear();
+ tuple.reference(newT);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (m_scanner != null) {
+ m_scanner.close();
+ m_scanner = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return m_regionLocation + ":" + Bytes.toString(m_startRow) + ","
+ + Bytes.toString(m_endRow);
+ }
+
+}
Added:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=727092&view=auto
==============================================================================
---
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(added)
+++
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Tue Dec 16 09:21:18 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.pig.backend.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.ExecType;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * A <code>Slicer</code> that split the hbase table into {...@link
HBaseSlice}s.
+ * And a load function will provided to do none load operations, the actually
+ * load operatrions will be done in {...@link HBaseSlice}.
+ */
+public class HBaseStorage extends Utf8StorageConverter implements Slicer,
+ LoadFunc {
+
+ private byte[][] m_cols;
+ private HTable m_table;
+ private HBaseConfiguration m_conf;
+
+ private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+ // HBase Slicer
+ // Creates a slice per region of a specified table.
+
+ /**
+ * Constructor. Construct a HBase Table loader to load the cells of the
+ * provided columns.
+ *
+ * @param columnList
+ * columnlist that is a presented string delimited by space.
+ */
+ public HBaseStorage(String columnList) {
+ String[] colNames = columnList.split(" ");
+ m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ }
+
+ m_conf = new HBaseConfiguration();
+ }
+
+ @Override
+ public Slice[] slice(DataStorage store, String tablename)
+ throws IOException {
+ validate(store, tablename);
+
+ byte[][] startKeys = m_table.getStartKeys();
+ if (startKeys == null || startKeys.length == 0) {
+ throw new IOException("Expecting at least one region");
+ }
+ if (m_cols == null || m_cols.length == 0) {
+ throw new IOException("Expecting at least one column");
+ }
+ // one region one slice
+ Slice[] slices = new Slice[startKeys.length];
+ for (int i = 0; i < startKeys.length; i++) {
+ String regionLocation = m_table.getRegionLocation(startKeys[i])
+ .getServerAddress().getHostname();
+ slices[i] = new HBaseSlice(m_table.getTableName(), startKeys[i],
+ ((i + 1) < startKeys.length) ? startKeys[i + 1]
+ : HConstants.EMPTY_START_ROW, m_cols,
+ regionLocation);
+ LOG.info("slice: " + i + "->" + slices[i]);
+ }
+
+ return slices;
+ }
+
+ @Override
+ public void validate(DataStorage store, String tablename)
+ throws IOException {
+ ensureTable(tablename);
+ }
+
+ private void ensureTable(String tablename) throws IOException {
+ if (m_table == null) {
+ m_table = new HTable(m_conf, tablename);
+ }
+ }
+
+ // HBase LoadFunc
+ // It is just a mock class to let the UDF be casted to a LOADFUNC during
+ // parsing.
+
+ @Override
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public void fieldsToRead(Schema schema) {
+ // do nothing
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ // do nothing
+ return null;
+ }
+}
Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java?rev=727092&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
(added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestHBaseStorage.java
Tue Dec 16 09:21:18 2008
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/** {...@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
+public class TestHBaseStorage extends TestCase {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestHBaseStorage.class);
+
+ private MiniCluster cluster = MiniCluster.buildCluster();
+ private HBaseConfiguration conf;
+ private MiniHBaseCluster hbaseCluster;
+
+ private PigServer pig;
+
+ final static int NUM_REGIONSERVERS = 1;
+
+ // Test Table Inforamtions
+ private static final String TESTTABLE = "pigtable";
+ private static final String COLUMNFAMILY = "pig:";
+ private static final String TESTCOLUMN_A = "pig:col_a";
+ private static final String TESTCOLUMN_B = "pig:col_b";
+ private static final String TESTCOLUMN_C = "pig:col_c";
+ private static final HColumnDescriptor family =
+ new HColumnDescriptor(COLUMNFAMILY);
+ private static final int TEST_ROW_COUNT = 100;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ conf = new HBaseConfiguration(ConfigurationUtil.
+ toConfiguration(cluster.getProperties()));
+ try {
+ hBaseClusterSetup();
+ } catch (Exception e) {
+ if(hbaseCluster != null) {
+ hbaseCluster.shutdown();
+ }
+ throw e;
+ }
+
+ pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ /**
+ * Actually start the MiniHBase instance.
+ */
+ protected void hBaseClusterSetup() throws Exception {
+ // start the mini cluster
+ hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
+ // opening the META table ensures that cluster is running
+ new HTable(conf, HConstants.META_TABLE_NAME);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ // clear the table
+ deleteTable();
+ super.tearDown();
+ try {
+ HConnectionManager.deleteConnectionInfo(conf, true);
+ if (hbaseCluster != null) {
+ try {
+ hbaseCluster.shutdown();
+ } catch (Exception e) {
+ LOG.warn("Closing mini hbase cluster", e);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ pig.shutdown();
+ }
+
+ /**
+ * load from hbase test
+ * @throws IOException
+ * @throws ExecException
+ */
+ @Test
+ public void testLoadFromHBase() throws IOException, ExecException {
+ prepareTable();
+ pig.registerQuery("a = load '" + TESTTABLE + "' using " +
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('" +
TESTCOLUMN_A +
+ " " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a,
col_b:int, col_c);");
+ Iterator<Tuple> it = pig.openIterator("a");
+ int count = 0;
+ LOG.info("LoadFromHBase Starting");
+ while(it.hasNext()){
+ Tuple t = it.next();
+ LOG.info("LoadFromHBase "+ t);
+ String col_a = ((DataByteArray)t.get(0)).toString();
+ int col_b = (Integer)t.get(1);
+ String col_c = ((DataByteArray)t.get(2)).toString();
+
+ assertEquals(String.valueOf(count), col_a);
+ assertEquals(count, col_b);
+ assertEquals("TEXT" + count, col_c);
+
+ count++;
+ }
+ assertEquals(TEST_ROW_COUNT, count);
+ System.err.println("LoadFromHBase done");
+ }
+
+ /**
+ * Prepare a table in hbase for testing.
+ *
+ * @throws IOException
+ */
+ private void prepareTable() throws IOException {
+ // define the table schema
+ HTableDescriptor tabledesc = new HTableDescriptor(TESTTABLE);
+ tabledesc.addFamily(family);
+
+ // create the table
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ if(admin.tableExists(TESTTABLE)) {
+ deleteTable();
+ }
+ admin.createTable(tabledesc);
+
+ // put some data into table
+ HTable table = new HTable(conf, TESTTABLE);
+
+ BatchUpdate batchUpdate;
+
+ for(int i = 0 ; i < TEST_ROW_COUNT ; i++) {
+ String v = Integer.toString(i);
+ batchUpdate = new BatchUpdate(Bytes.toBytes(
+ "00".substring(v.length()) + v));
+ batchUpdate.put(TESTCOLUMN_A, Bytes.toBytes(v));
+ batchUpdate.put(TESTCOLUMN_B, Bytes.toBytes(v));
+ batchUpdate.put(TESTCOLUMN_C, Bytes.toBytes("TEXT" + i));
+ table.commit(batchUpdate);
+ }
+ }
+
+ private void deleteTable() throws IOException {
+ // delete the table
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ if(admin.tableExists(TESTTABLE)) {
+ admin.disableTable(TESTTABLE);
+ while(admin.isTableEnabled(TESTTABLE)) {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ // do nothing.
+ }
+ }
+ admin.deleteTable(TESTTABLE);
+ }
+ }
+
+}