Author: pradeepkth
Date: Tue Jan 26 21:38:13 2010
New Revision: 903430
URL: http://svn.apache.org/viewvc?rev=903430&view=rev
Log:
PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)
Removed:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseSlice.java
Modified:
hadoop/pig/branches/load-store-redesign/CHANGES.txt
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Modified: hadoop/pig/branches/load-store-redesign/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/CHANGES.txt?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/CHANGES.txt Tue Jan 26 21:38:13 2010
@@ -22,6 +22,8 @@
INCOMPATIBLE CHANGES
+PIG-1200: Using TableInputFormat in HBaseStorage (zjffdu via pradeepkth)
+
PIG-1148: Move splitable logic from pig latin to InputFormat (zjffdu via
pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=903430&r1=903429&r2=903430&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
Tue Jan 26 21:38:13 2010
@@ -16,155 +16,124 @@
*/
package org.apache.pig.backend.hadoop.hbase;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.LoadFunc;
-import org.apache.pig.Slice;
-import org.apache.pig.Slicer;
-import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
/**
* A <code>Slicer</code> that split the hbase table into {...@link
HBaseSlice}s.
* And a load function will provided to do none load operations, the actually
* load operatrions will be done in {...@link HBaseSlice}.
*/
-public class HBaseStorage extends LoadFunc implements Slicer {
+public class HBaseStorage extends LoadFunc {
- private byte[][] m_cols;
- private HTable m_table;
- private HBaseConfiguration m_conf;
-
- private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
-
- // HBase Slicer
- // Creates a slice per region of a specified table.
-
- /**
- * Constructor. Construct a HBase Table loader to load the cells of the
- * provided columns.
- *
- * @param columnList
- * columnlist that is a presented string delimited by space.
- */
- public HBaseStorage(String columnList) {
- String[] colNames = columnList.split(" ");
- m_cols = new byte[colNames.length][];
- for (int i = 0; i < m_cols.length; i++) {
- m_cols[i] = Bytes.toBytes(colNames[i]);
- }
-
- m_conf = new HBaseConfiguration();
- }
-
- @Override
- public Slice[] slice(DataStorage store, String tablename)
- throws IOException {
- validate(store, tablename);
-
- byte[][] startKeys = m_table.getStartKeys();
- if (startKeys == null || startKeys.length == 0) {
- throw new IOException("Expecting at least one region");
- }
- if (m_cols == null || m_cols.length == 0) {
- throw new IOException("Expecting at least one column");
- }
- // one region one slice
- Slice[] slices = new Slice[startKeys.length];
- for (int i = 0; i < startKeys.length; i++) {
- String regionLocation = m_table.getRegionLocation(startKeys[i])
- .getServerAddress().getHostname();
- slices[i] = new HBaseSlice(m_table.getTableName(), startKeys[i],
- ((i + 1) < startKeys.length) ? startKeys[i + 1]
- : HConstants.EMPTY_START_ROW, m_cols,
- regionLocation);
- LOG.info("slice: " + i + "->" + slices[i]);
- }
+ private byte[][] m_cols;
+ private HTable m_table;
+ private Configuration m_conf=new Configuration();
+ private RecordReader reader;
+ private Scan scan=new Scan();
+
+ private static final Log LOG = LogFactory.getLog(HBaseStorage.class);
+
+ // HBase Slicer
+ // Creates a slice per region of a specified table.
+
+ /**
+ * Constructor. Construct a HBase Table loader to load the cells of the
+ * provided columns.
+ *
+ * @param columnList
+ * columnlist that is a presented string delimited by space.
+ */
+ public HBaseStorage(String columnList) {
+ String[] colNames = columnList.split(" ");
+ m_cols = new byte[colNames.length][];
+ for (int i = 0; i < m_cols.length; i++) {
+ m_cols[i] = Bytes.toBytes(colNames[i]);
+ scan.addColumn(m_cols[i]);
+ }
+ }
+
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ if (reader.nextKeyValue()) {
+ ImmutableBytesWritable rowKey =
(ImmutableBytesWritable) reader
+ .getCurrentKey();
+ Result result = (Result)
reader.getCurrentValue();
+ Tuple
tuple=TupleFactory.getInstance().newTuple(m_cols.length);
+ for (int i=0;i<m_cols.length;++i){
+ tuple.set(i, new
DataByteArray(result.getValue(m_cols[i])));
+ }
+ return tuple;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public InputFormat getInputFormat() {
+ TableInputFormat inputFormat = new TableInputFormat();
+ inputFormat.setConf(m_conf);
+ return inputFormat;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ this.reader = reader;
+ }
- return slices;
- }
-
- @Override
- public void validate(DataStorage store, String tablename)
- throws IOException {
- ensureTable(tablename);
- }
-
- private void ensureTable(String tablename) throws IOException {
- LOG.info("tablename: "+tablename);
-
- // We're looking for the right scheme here (actually, we don't
- // care what the scheme is as long as it is one and it's
- // different from hdfs and file. If the user specified to use
- // the multiquery feature and did not specify a scheme we will
- // have transformed it to an absolute path. In that case we'll
- // take the last component and guess that's what was
- // meant. We'll print a warning in that case.
- int index;
- if(-1 != (index = tablename.indexOf("://"))) {
- if (tablename.startsWith("hdfs:")
- || tablename.startsWith("file:")) {
- index = tablename.lastIndexOf("/");
- if (-1 == index) {
- index = tablename.lastIndexOf("\\");
- }
-
- if (-1 == index) {
- throw new IOException("Got tablename: "+tablename
- +". Either turn off multiquery (-no_multiquery)"
- +" or specify load path as \"hbase://<tablename>\".");
- } else {
- String in = tablename;
- tablename = tablename.substring(index+1);
- LOG.warn("Got tablename: "+in+" Assuming you meant table: "
- +tablename+". Either turn off multiquery
(-no_multiquery) "
- +"or specify load path as \"hbase://<tablename>\"
"
- +"to avoid this warning.");
- }
- } else {
- tablename = tablename.substring(index+3);
- }
- }
-
- if (m_table == null) {
- m_table = new HTable(m_conf, tablename);
- }
- }
-
- // HBase LoadFunc
- // It is just a mock class to let the UDF be casted to a LOADFUNC during
- // parsing.
-
-
- @Override
- public Tuple getNext() throws IOException {
- // do nothing
- return null;
- }
-
- @Override
- public InputFormat getInputFormat() {
- return null;
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) {
- throw new UnsupportedOperationException();
- }
-
- @Override
+ @Override
public void setLocation(String location, Job job) throws IOException {
- throw new UnsupportedOperationException();
- }
+ if (location.startsWith("hbase://")){
+ m_conf.set(TableInputFormat.INPUT_TABLE, location.substring(8));
+ }else{
+ m_conf.set(TableInputFormat.INPUT_TABLE, location);
+ }
+ m_conf.set(TableInputFormat.SCAN, convertScanToString(scan));
+ }
+
+ @Override
+ public String relativeToAbsolutePath(String location, Path curDir)
+ throws IOException {
+ return location;
+ }
+
+ private static String convertScanToString(Scan scan) {
+
+ try {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(out);
+ scan.write(dos);
+ return Base64.encodeBytes(out.toByteArray());
+ } catch (IOException e) {
+ LOG.error(e);
+ return "";
+ }
+
+ }
}