http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java
new file mode 100644
index 0000000..089a299
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRecordUpdater.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.phoenix.hive.PhoenixSerializer.DmlType;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable;
+import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.hive.util.PhoenixUtil;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.ConcurrentTableMutationException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.util.QueryUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class PhoenixRecordUpdater implements RecordUpdater {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixRecordUpdater.class);
+
+    private final Connection conn;
+    private final PreparedStatement pstmt;
+    private final long batchSize;
+    private long numRecords = 0;
+
+    private Configuration config;
+    private String tableName;
+    private MetaDataClient metaDataClient;
+    private boolean restoreWalMode;
+
+    private long rowCountDelta = 0;
+
+    private PhoenixSerializer phoenixSerializer;
+    private ObjectInspector objInspector;
+    private PreparedStatement pstmtForDelete;
+
+    public PhoenixRecordUpdater(Path path, AcidOutputFormat.Options options) 
throws IOException {
+        this.config = options.getConfiguration();
+        tableName = 
config.get(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
+
+        Properties props = new Properties();
+
+        try {
+            // Disable WAL
+            String walConfigName = tableName.toLowerCase() + 
PhoenixStorageHandlerConstants
+                    .DISABLE_WAL;
+            boolean disableWal = config.getBoolean(walConfigName, false);
+            if (disableWal) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(walConfigName + " is true. batch.mode will be 
set true.");
+                }
+
+                props.setProperty(PhoenixStorageHandlerConstants.BATCH_MODE, 
"true");
+            }
+
+            this.conn = PhoenixConnectionUtil.getInputConnection(config, 
props);
+
+            if (disableWal) {
+                metaDataClient = new MetaDataClient((PhoenixConnection) conn);
+
+                if (!PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
+                    // execute alter tablel statement if disable_wal is not 
true.
+                    try {
+                        PhoenixUtil.alterTableForWalDisable(conn, tableName, 
true);
+                    } catch (ConcurrentTableMutationException e) {
+                        if (LOG.isWarnEnabled()) {
+                            LOG.warn("Concurrent modification of disableWAL");
+                        }
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(tableName + "s wal disabled.");
+                    }
+
+                    // restore original value of disable_wal at the end.
+                    restoreWalMode = true;
+                }
+            }
+
+            this.batchSize = PhoenixConfigurationUtil.getBatchSize(config);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Batch-size : " + batchSize);
+            }
+
+            String upsertQuery = QueryUtil.constructUpsertStatement(tableName, 
PhoenixUtil
+                    .getColumnInfoList(conn, tableName));
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Upsert-query : " + upsertQuery);
+            }
+            this.pstmt = this.conn.prepareStatement(upsertQuery);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+
+        this.objInspector = options.getInspector();
+        try {
+            phoenixSerializer = new PhoenixSerializer(config, 
options.getTableProperties());
+        } catch (SerDeException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#insert(long, 
java.lang.Object)
+     */
+    @Override
+    public void insert(long currentTransaction, Object row) throws IOException 
{
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Insert - currentTranscation : " + currentTransaction + 
", row : " +
+                    PhoenixStorageHandlerUtil.toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) 
phoenixSerializer
+                .serialize(row, objInspector, DmlType.INSERT);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        write(pResultWritable);
+
+        rowCountDelta++;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#update(long, 
java.lang.Object)
+     */
+    @Override
+    public void update(long currentTransaction, Object row) throws IOException 
{
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Update - currentTranscation : " + currentTransaction + 
", row : " +
+                    PhoenixStorageHandlerUtil.toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) 
phoenixSerializer
+                .serialize(row, objInspector, DmlType.UPDATE);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        write(pResultWritable);
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#delete(long, 
java.lang.Object)
+     */
+    @Override
+    public void delete(long currentTransaction, Object row) throws IOException 
{
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Delete - currentTranscation : " + currentTransaction + 
", row : " +
+                    PhoenixStorageHandlerUtil.toString(row));
+        }
+
+        PhoenixResultWritable pResultWritable = (PhoenixResultWritable) 
phoenixSerializer
+                .serialize(row, objInspector, DmlType.DELETE);
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Data : " + pResultWritable.getValueList());
+        }
+
+        if (pstmtForDelete == null) {
+            try {
+                String deleteQuery = 
PhoenixUtil.constructDeleteStatement(conn, tableName);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Delete query : " + deleteQuery);
+                }
+
+                pstmtForDelete = conn.prepareStatement(deleteQuery);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+
+        delete(pResultWritable);
+
+        rowCountDelta--;
+    }
+
+    private void delete(PhoenixResultWritable pResultWritable) throws 
IOException {
+        try {
+            pResultWritable.delete(pstmtForDelete);
+            numRecords++;
+            pstmtForDelete.executeUpdate();
+
+            if (numRecords % batchSize == 0) {
+                LOG.debug("Commit called on a batch of size : " + batchSize);
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException("Exception while deleting to table.", e);
+        }
+    }
+
+    private void write(PhoenixResultWritable pResultWritable) throws 
IOException {
+        try {
+            pResultWritable.write(pstmt);
+            numRecords++;
+            pstmt.executeUpdate();
+
+            if (numRecords % batchSize == 0) {
+                LOG.debug("Commit called on a batch of size : " + batchSize);
+                conn.commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException("Exception while writing to table.", e);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#flush()
+     */
+    @Override
+    public void flush() throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Flush called");
+        }
+
+        try {
+            conn.commit();
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Written row : " + numRecords);
+            }
+        } catch (SQLException e) {
+            LOG.error("SQLException while performing the commit for the 
task.");
+            throw new IOException(e);
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#close(boolean)
+     */
+    @Override
+    public void close(boolean abort) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("abort : " + abort);
+        }
+
+        try {
+            conn.commit();
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Written row : " + numRecords);
+            }
+        } catch (SQLException e) {
+            LOG.error("SQLException while performing the commit for the 
task.");
+            throw new IOException(e);
+        } finally {
+            try {
+                if (restoreWalMode && 
PhoenixUtil.isDisabledWal(metaDataClient, tableName)) {
+                    try {
+                        PhoenixUtil.alterTableForWalDisable(conn, tableName, 
false);
+                    } catch (ConcurrentTableMutationException e) {
+                        if (LOG.isWarnEnabled()) {
+                            LOG.warn("Concurrent modification of disableWAL");
+                        }
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(tableName + "s wal enabled.");
+                    }
+                }
+
+                // flush when [table-name].auto.flush is true.
+                String autoFlushConfigName = tableName.toLowerCase() +
+                        PhoenixStorageHandlerConstants.AUTO_FLUSH;
+                boolean autoFlush = config.getBoolean(autoFlushConfigName, 
false);
+                if (autoFlush) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("autoFlush is " + autoFlush);
+                    }
+
+                    PhoenixUtil.flush(conn, tableName);
+                }
+
+                PhoenixUtil.closeResource(pstmt);
+                PhoenixUtil.closeResource(pstmtForDelete);
+                PhoenixUtil.closeResource(conn);
+            } catch (SQLException ex) {
+                LOG.error("SQLException while closing the connection for the 
task.");
+                throw new IOException(ex);
+            }
+        }
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.ql.io.RecordUpdater#getStats()
+     */
+    @Override
+    public SerDeStats getStats() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("getStats called");
+        }
+
+        SerDeStats stats = new SerDeStats();
+        stats.setRowCount(rowCountDelta);
+        // Don't worry about setting raw data size diff.  There is no 
reasonable way  to calculate
+        // that without finding the row we are updating or deleting, which 
would be a mess.
+        return stats;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java
new file mode 100644
index 0000000..fa307ce
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRow.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.serde2.StructObject;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation for Hive SerDe StructObject
+ */
+public class PhoenixRow implements StructObject {
+
+    private List<String> columnList;
+    private Map<String, Object> resultRowMap;
+
+    public PhoenixRow(List<String> columnList) {
+        this.columnList = columnList;
+    }
+
+    public PhoenixRow setResultRowMap(Map<String, Object> resultRowMap) {
+        this.resultRowMap = resultRowMap;
+        return this;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.serde2.StructObject#getField(int)
+     */
+    @Override
+    public Object getField(int fieldID) {
+        return resultRowMap.get(columnList.get(fieldID));
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.hive.serde2.StructObject#getFieldsAsList()
+     */
+    @Override
+    public List<Object> getFieldsAsList() {
+        return Lists.newArrayList(resultRowMap.values());
+    }
+
+
+    @Override
+    public String toString() {
+        return resultRowMap.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java
new file mode 100644
index 0000000..c4cbb2c
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixRowKey.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Hive's RecordIdentifier implementation.
+ */
+
+public class PhoenixRowKey extends RecordIdentifier {
+
+    private Map<String, Object> rowKeyMap = Maps.newHashMap();
+
+    public PhoenixRowKey() {
+
+    }
+
+    public void setRowKeyMap(Map<String, Object> rowKeyMap) {
+        this.rowKeyMap = rowKeyMap;
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        super.write(dataOutput);
+
+        try (ObjectOutputStream oos = new ObjectOutputStream((OutputStream) 
dataOutput)) {
+            oos.writeObject(rowKeyMap);
+            oos.flush();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        super.readFields(dataInput);
+
+        try (ObjectInputStream ois = new ObjectInputStream((InputStream) 
dataInput)) {
+            rowKeyMap = (Map<String, Object>) ois.readObject();
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
new file mode 100644
index 0000000..dd38cfb
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import 
org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.hive.PhoenixSerializer.DmlType;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable;
+import org.apache.phoenix.hive.objectinspector.PhoenixObjectInspectorFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * SerDe implementation for Phoenix Hive Storage
+ *
+ */
+public class PhoenixSerDe extends AbstractSerDe {
+
+    public static final Log LOG = LogFactory.getLog(PhoenixSerDe.class);
+
+    private PhoenixSerializer serializer;
+    private ObjectInspector objectInspector;
+
+    private LazySerDeParameters serdeParams;
+    private PhoenixRow row;
+
+    private Properties tableProperties;
+
+    /**
+     * @throws SerDeException
+     */
+    public PhoenixSerDe() throws SerDeException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("PhoenixSerDe created");
+        }
+    }
+
+    @Override
+    public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
+        tableProperties = tbl;
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("SerDe initialize : " + tbl.getProperty("name"));
+        }
+
+        serdeParams = new LazySerDeParameters(conf, tbl, getClass().getName());
+        objectInspector = createLazyPhoenixInspector(conf, tbl);
+
+        String inOutWork = 
tbl.getProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK);
+        if (inOutWork == null) {
+            return;
+        }
+
+        serializer = new PhoenixSerializer(conf, tbl);
+        row = new PhoenixRow(Lists.transform(serdeParams.getColumnNames(), new 
Function<String,
+                String>() {
+
+            @Override
+            public String apply(String input) {
+                return input.toUpperCase();
+            }
+        }));
+    }
+
+    @Override
+    public Object deserialize(Writable result) throws SerDeException {
+        if (!(result instanceof PhoenixResultWritable)) {
+            throw new SerDeException(result.getClass().getName() + ": expects 
" +
+                    "PhoenixResultWritable!");
+        }
+
+        return row.setResultRowMap(((PhoenixResultWritable) 
result).getResultMap());
+    }
+
+    @Override
+    public Class<? extends Writable> getSerializedClass() {
+        return PhoenixResultWritable.class;
+    }
+
+    @Override
+    public Writable serialize(Object obj, ObjectInspector objInspector) throws 
SerDeException {
+        try {
+            return serializer.serialize(obj, objInspector, DmlType.NONE);
+        } catch (Exception e) {
+            throw new SerDeException(e);
+        }
+    }
+
+    @Override
+    public SerDeStats getSerDeStats() {
+        // no support for statistics
+        return null;
+    }
+
+    public Properties getTableProperties() {
+        return tableProperties;
+    }
+
+    public LazySerDeParameters getSerdeParams() {
+        return serdeParams;
+    }
+
+    @Override
+    public ObjectInspector getObjectInspector() throws SerDeException {
+        return objectInspector;
+    }
+
+    private ObjectInspector createLazyPhoenixInspector(Configuration conf, 
Properties tbl) throws
+            SerDeException {
+        List<String> columnNameList = 
Arrays.asList(tbl.getProperty(serdeConstants.LIST_COLUMNS)
+                .split(PhoenixStorageHandlerConstants.COMMA));
+        List<TypeInfo> columnTypeList = 
TypeInfoUtils.getTypeInfosFromTypeString(tbl.getProperty
+                (serdeConstants.LIST_COLUMN_TYPES));
+
+        List<ObjectInspector> columnObjectInspectors = 
Lists.newArrayListWithExpectedSize
+                (columnTypeList.size());
+
+        for (TypeInfo typeInfo : columnTypeList) {
+            
columnObjectInspectors.add(PhoenixObjectInspectorFactory.createObjectInspector
+                    (typeInfo, serdeParams));
+        }
+
+        return 
LazyObjectInspectorFactory.getLazySimpleStructObjectInspector(columnNameList,
+                columnObjectInspectors, null, serdeParams.getSeparators()[0], 
serdeParams,
+                ObjectInspectorOptions.JAVA);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
new file mode 100644
index 0000000..e43ed0e
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable;
+import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.hive.util.PhoenixUtil;
+import org.apache.phoenix.util.ColumnInfo;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Serializer used in PhoenixSerDe and PhoenixRecordUpdater to produce 
Writable.
+ */
+public class PhoenixSerializer {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixSerializer.class);
+
+    public static enum DmlType {
+        NONE,
+        SELECT,
+        INSERT,
+        UPDATE,
+        DELETE
+    }
+
+    private int columnCount = 0;
+    private PhoenixResultWritable pResultWritable;
+
+    public PhoenixSerializer(Configuration config, Properties tbl) throws 
SerDeException {
+        try (Connection conn = 
PhoenixConnectionUtil.getInputConnection(config, tbl)) {
+            List<ColumnInfo> columnMetadata = 
PhoenixUtil.getColumnInfoList(conn, tbl.getProperty
+                    (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME));
+
+            columnCount = columnMetadata.size();
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Column-meta : " + columnMetadata);
+            }
+
+            pResultWritable = new PhoenixResultWritable(config, 
columnMetadata);
+        } catch (SQLException | IOException e) {
+            throw new SerDeException(e);
+        }
+    }
+
+    public Writable serialize(Object values, ObjectInspector objInspector, 
DmlType dmlType) {
+        pResultWritable.clear();
+
+        final StructObjectInspector structInspector = (StructObjectInspector) 
objInspector;
+        final List<? extends StructField> fieldList = 
structInspector.getAllStructFieldRefs();
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("FieldList : " + fieldList + " values(" + 
values.getClass() + ") : " +
+                    values);
+        }
+
+        int fieldCount = columnCount;
+        if (dmlType == DmlType.UPDATE || dmlType == DmlType.DELETE) {
+            fieldCount++;
+        }
+
+        for (int i = 0; i < fieldCount; i++) {
+            if (fieldList.size() <= i) {
+                break;
+            }
+
+            StructField structField = fieldList.get(i);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("structField[" + i + "] : " + structField);
+            }
+
+            if (structField != null) {
+                Object fieldValue = structInspector.getStructFieldData(values, 
structField);
+                ObjectInspector fieldOI = 
structField.getFieldObjectInspector();
+
+                String fieldName = structField.getFieldName();
+
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Field " + fieldName + "[" + i + "] : " + 
fieldValue + ", " +
+                            fieldOI);
+                }
+
+                Object value = null;
+                switch (fieldOI.getCategory()) {
+                    case PRIMITIVE:
+                        value = ((PrimitiveObjectInspector) 
fieldOI).getPrimitiveJavaObject
+                                (fieldValue);
+
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Field " + fieldName + "[" + i + "] : " 
+ value + "(" + value
+                                    .getClass() + ")");
+                        }
+
+                        if (value instanceof HiveDecimal) {
+                            value = ((HiveDecimal) value).bigDecimalValue();
+                        } else if (value instanceof HiveChar) {
+                            value = ((HiveChar) value).getValue().trim();
+                        }
+
+                        pResultWritable.add(value);
+                        break;
+                    case LIST:
+                    // Not support for arrays in insert statement yet
+                        break;
+                    case STRUCT:
+                        if (dmlType == DmlType.DELETE) {
+                            // When update/delete, First value is 
struct<transactionid:bigint,
+                            // bucketid:int,rowid:bigint,primaryKey:binary>>
+                            List<Object> fieldValueList = 
((StandardStructObjectInspector)
+                                    
fieldOI).getStructFieldsDataAsList(fieldValue);
+
+                            // convert to map from binary of primary key.
+                            @SuppressWarnings("unchecked")
+                            Map<String, Object> primaryKeyMap = (Map<String, 
Object>)
+                                    
PhoenixStorageHandlerUtil.toMap(((BytesWritable)
+                                            fieldValueList.get(3)).getBytes());
+                            for (Object pkValue : primaryKeyMap.values()) {
+                                pResultWritable.add(pkValue);
+                            }
+                        }
+
+                        break;
+                    default:
+                        new SerDeException("Phoenix Unsupported column type: " 
+ fieldOI
+                                .getCategory());
+                }
+            }
+        }
+
+        return pResultWritable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
new file mode 100644
index 0000000..e8b5b19
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.hive.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
+import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This class manages all the Phoenix/Hive table initial configurations and 
SerDe Election
+ */
+@SuppressWarnings("deprecation")
+public class PhoenixStorageHandler extends DefaultStorageHandler implements
+        HiveStoragePredicateHandler, InputEstimator {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixStorageHandler.class);
+
+    public PhoenixStorageHandler() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("PhoenixStorageHandler created");
+        }
+    }
+
+    @Override
+    public HiveMetaHook getMetaHook() {
+        return new PhoenixMetaHook();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Class<? extends OutputFormat> getOutputFormatClass() {
+        return PhoenixOutputFormat.class;
+    }
+
+    @Override
+    public void configureInputJobProperties(TableDesc tableDesc, Map<String, 
String>
+            jobProperties) {
+        configureJobProperties(tableDesc, jobProperties);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Configuring input job for table : " + 
tableDesc.getTableName());
+        }
+
+        // initialization efficiency. Inform to SerDe about in/out work.
+        
tableDesc.getProperties().setProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK,
+                PhoenixStorageHandlerConstants.IN_WORK);
+    }
+
+    @Override
+    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, 
String>
+            jobProperties) {
+        configureJobProperties(tableDesc, jobProperties);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Configuring output job for  table : " + 
tableDesc.getTableName());
+        }
+
+        // initialization efficiency. Inform to SerDe about in/out work.
+        
tableDesc.getProperties().setProperty(PhoenixStorageHandlerConstants.IN_OUT_WORK,
+                PhoenixStorageHandlerConstants.OUT_WORK);
+    }
+
+    @Override
+    public void configureTableJobProperties(TableDesc tableDesc, Map<String, 
String>
+            jobProperties) {
+        configureJobProperties(tableDesc, jobProperties);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void configureJobProperties(TableDesc tableDesc, Map<String, 
String> jobProperties) {
+        Properties tableProperties = tableDesc.getProperties();
+
+        String inputFormatClassName =
+                tableProperties.getProperty(PhoenixStorageHandlerConstants
+                        .HBASE_INPUT_FORMAT_CLASS);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(PhoenixStorageHandlerConstants.HBASE_INPUT_FORMAT_CLASS 
+ " is " +
+                    inputFormatClassName);
+        }
+
+        Class<?> inputFormatClass;
+        try {
+            if (inputFormatClassName != null) {
+                inputFormatClass = JavaUtils.loadClass(inputFormatClassName);
+            } else {
+                inputFormatClass = PhoenixInputFormat.class;
+            }
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+            throw new RuntimeException(e);
+        }
+
+        if (inputFormatClass != null) {
+            tableDesc.setInputFileFormatClass((Class<? extends InputFormat>) 
inputFormatClass);
+        }
+
+        String tableName = 
tableProperties.getProperty(PhoenixStorageHandlerConstants
+                .PHOENIX_TABLE_NAME);
+        if (tableName == null) {
+            tableName = tableDesc.getTableName();
+            
tableProperties.setProperty(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME,
+                    tableName);
+        }
+
+        jobProperties.put(PhoenixConfigurationUtil.INPUT_TABLE_NAME, 
tableName);
+        jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM, 
tableProperties
+                .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM,
+                        
PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_QUORUM));
+        jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, 
tableProperties
+                .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, 
String.valueOf
+                        
(PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PORT)));
+        jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, 
tableProperties
+                .getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT,
+                        
PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PARENT));
+
+        jobProperties.put(hive_metastoreConstants.META_TABLE_STORAGE, 
this.getClass().getName());
+
+        // set configuration when direct work with HBase.
+        jobProperties.put(HConstants.ZOOKEEPER_QUORUM, jobProperties.get
+                (PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM));
+        jobProperties.put(HConstants.ZOOKEEPER_CLIENT_PORT, jobProperties.get
+                (PhoenixStorageHandlerConstants.ZOOKEEPER_PORT));
+        jobProperties.put(HConstants.ZOOKEEPER_ZNODE_PARENT, jobProperties.get
+                (PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT));
+    }
+
+    @Override
+    public Class<? extends SerDe> getSerDeClass() {
+        return PhoenixSerDe.class;
+    }
+
+    @Override
+    public DecomposedPredicate decomposePredicate(JobConf jobConf, 
Deserializer deserializer,
+                                                  ExprNodeDesc predicate) {
+        PhoenixSerDe phoenixSerDe = (PhoenixSerDe) deserializer;
+        String tableName = phoenixSerDe.getTableProperties().getProperty
+                (PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME);
+        String predicateKey = 
PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf, tableName);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Decomposing predicate with predicateKey : " + 
predicateKey);
+        }
+
+        List<String> columnNameList = 
phoenixSerDe.getSerdeParams().getColumnNames();
+        PhoenixPredicateDecomposer predicateDecomposer = 
PhoenixPredicateDecomposerManager
+                .createPredicateDecomposer(predicateKey, columnNameList);
+
+        return predicateDecomposer.decomposePredicate(predicate);
+    }
+
+    @Override
+    public Estimation estimate(JobConf job, TableScanOperator ts, long 
remaining) throws
+            HiveException {
+        String hiveTableName = ts.getConf().getTableMetadata().getTableName();
+        int reducerCount = job.getInt(hiveTableName + 
PhoenixStorageHandlerConstants
+                .PHOENIX_REDUCER_NUMBER, 1);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Estimating input size for table: " + hiveTableName + " 
with reducer count " +
+                    reducerCount + ". Remaining : " + remaining);
+        }
+
+        long bytesPerReducer = 
job.getLong(HiveConf.ConfVars.BYTESPERREDUCER.varname,
+                
Long.parseLong(HiveConf.ConfVars.BYTESPERREDUCER.getDefaultValue()));
+        long totalLength = reducerCount * bytesPerReducer;
+
+        return new Estimation(0, totalLength);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java
new file mode 100644
index 0000000..07c374e
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/constants/PhoenixStorageHandlerConstants.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.constants;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.IntWritable;
+
+import java.util.List;
+
+/**
+ * Constants using for Hive Storage Handler implementation
+ */
+public class PhoenixStorageHandlerConstants {
+
+    public static final String HBASE_INPUT_FORMAT_CLASS = 
"phoenix.input.format.class";
+
+    public static final String PHOENIX_TABLE_NAME = "phoenix.table.name";
+
+    public static final String DEFAULT_PHOENIX_INPUT_CLASS = 
"org.apache.phoenix.hive.mapreduce" +
+            ".PhoenixResultWritable";
+
+    public static final String ZOOKEEPER_QUORUM = "phoenix.zookeeper.quorum";
+    public static final String ZOOKEEPER_PORT = 
"phoenix.zookeeper.client.port";
+    public static final String ZOOKEEPER_PARENT = 
"phoenix.zookeeper.znode.parent";
+    public static final String DEFAULT_ZOOKEEPER_QUORUM = "localhost";
+    public static final int DEFAULT_ZOOKEEPER_PORT = 2181;
+    public static final String DEFAULT_ZOOKEEPER_PARENT = "/hbase";
+
+    public static final String PHOENIX_ROWKEYS = "phoenix.rowkeys";
+    public static final String PHOENIX_COLUMN_MAPPING = 
"phoenix.column.mapping";
+    public static final String PHOENIX_TABLE_OPTIONS = "phoenix.table.options";
+
+    public static final String PHOENIX_TABLE_QUERY_HINT = ".query.hint";
+    public static final String PHOENIX_REDUCER_NUMBER = ".reducer.count";
+    public static final String DISABLE_WAL = ".disable.wal";
+    public static final String BATCH_MODE = "batch.mode";
+    public static final String AUTO_FLUSH = ".auto.flush";
+
+    public static final String COLON = ":";
+    public static final String COMMA = ",";
+    public static final String EMPTY_STRING = "";
+    public static final String SPACE = " ";
+    public static final String LEFT_ROUND_BRACKET = "(";
+    public static final String RIGHT_ROUND_BRACKET = ")";
+    public static final String QUOTATION_MARK = "'";
+    public static final String EQUAL = "=";
+    public static final String IS = "is";
+    public static final String QUESTION = "?";
+
+    public static final String SPLIT_BY_STATS = "split.by.stats";
+    public static final String HBASE_SCAN_CACHE = "hbase.scan.cache";
+    public static final String HBASE_SCAN_CACHEBLOCKS = 
"hbase.scan.cacheblock";
+    public static final String HBASE_DATE_FORMAT = "hbase.date.format";
+    public static final String HBASE_TIMESTAMP_FORMAT = 
"hbase.timestamp.format";
+    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+    public static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-dd 
HH:mm:ss.SSS";
+
+    public static final String IN_OUT_WORK = "in.out.work";
+    public static final String IN_WORK = "input";
+    public static final String OUT_WORK = "output";
+
+    public static final String MR = "mr";
+    public static final String TEZ = "tez";
+    public static final String SPARK = "spark";
+
+    public static final String DATE_TYPE = "date";
+    public static final String TIMESTAMP_TYPE = "timestamp";
+    public static final String BETWEEN_COMPARATOR = "between";
+    public static final String IN_COMPARATOR = "in";
+    public static final List<String> COMMON_COMPARATOR = 
Lists.newArrayList("=", "<", ">", "<=",
+            ">=");
+
+    // date/timestamp
+    public static final String COLUMNE_MARKER = "$columnName$";
+    public static final String PATERN_MARKER = "$targetPattern$";
+    public static final String DATE_PATTERN = "'?\\d{4}-\\d{2}-\\d{2}'?";
+    public static final String TIMESTAMP_PATTERN = "'?\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}\\" +
+            ".?\\d{0,3}'?";
+    public static final String COMMON_OPERATOR_PATTERN = "(\\(?" + 
COLUMNE_MARKER + "\\)?\\s*" +
+            "(=|>|<|<=|>=)\\s*(" + PATERN_MARKER + "))";
+    public static final String BETWEEN_OPERATOR_PATTERN = "(\\(?" + 
COLUMNE_MARKER + "\\)?\\s*(" +
+            "(?i)not)?\\s*(?i)between\\s*(" + PATERN_MARKER + 
")\\s*(?i)and\\s*(" + PATERN_MARKER
+            + "))";
+    public static final String IN_OPERATOR_PATTERN = "(\\(?" + COLUMNE_MARKER 
+ "\\)?\\s*((?i)" +
+            "not)?\\s*(?i)in\\s*\\((" + PATERN_MARKER + ",?\\s*)+\\))";
+
+    public static final String FUNCTION_VALUE_MARKER = "$value$";
+    public static final String DATE_FUNCTION_TEMPLETE = "to_date(" + 
FUNCTION_VALUE_MARKER + ")";
+    public static final String TIMESTAMP_FUNCTION_TEMPLATE = "to_timestamp(" +
+            FUNCTION_VALUE_MARKER + ")";
+
+    public static final IntWritable INT_ZERO = new IntWritable(0);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
new file mode 100644
index 0000000..0944bb7
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
+import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposerManager;
+import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
+import org.apache.phoenix.hive.query.PhoenixQueryBuilder;
+import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.PhoenixRuntime;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Custom InputFormat to feed into Hive
+ */
+@SuppressWarnings({"deprecation", "rawtypes"})
+public class PhoenixInputFormat<T extends DBWritable> implements 
InputFormat<WritableComparable,
+        T> {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+
+    public PhoenixInputFormat() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("PhoenixInputFormat created");
+        }
+    }
+
+    @Override
+    public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws 
IOException {
+        String tableName = 
jobConf.get(PhoenixConfigurationUtil.INPUT_TABLE_NAME);
+        List<IndexSearchCondition> conditionList = null;
+        String query;
+        String executionEngine = 
jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname,
+                HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Target table name at split phase : " + tableName + 
"with whereCondition :" +
+                    jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR) +
+                    " and " + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname 
+ " : " +
+                    executionEngine);
+        }
+
+        if (PhoenixStorageHandlerConstants.MR.equals(executionEngine)) {
+            String predicateKey = 
PhoenixStorageHandlerUtil.getTableKeyOfSession(jobConf,
+                    tableName);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("PredicateKey for MR job : " + predicateKey);
+            }
+
+            PhoenixPredicateDecomposer predicateDecomposer =
+                    
PhoenixPredicateDecomposerManager.getPredicateDecomposer(predicateKey);
+            if (predicateDecomposer != null && 
predicateDecomposer.isCalledPPD()) {
+                conditionList = predicateDecomposer.getSearchConditionList();
+            }
+
+            query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, 
tableName,
+                    ColumnProjectionUtils.getReadColumnNames(jobConf), 
conditionList);
+        } else if (PhoenixStorageHandlerConstants.TEZ.equals(executionEngine)) 
{
+            Map<String, String> columnTypeMap = 
PhoenixStorageHandlerUtil.createColumnTypeMap
+                    (jobConf);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Column type map for TEZ : " + columnTypeMap);
+            }
+
+            String whereClause = 
jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR);
+            query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, 
tableName,
+                    ColumnProjectionUtils.getReadColumnNames(jobConf), 
whereClause, columnTypeMap);
+        } else {
+            throw new IOException(executionEngine + " execution engine 
unsupported yet.");
+        }
+
+        final QueryPlan queryPlan = getQueryPlan(jobConf, query);
+        final List<KeyRange> allSplits = queryPlan.getSplits();
+        final List<InputSplit> splits = generateSplits(jobConf, queryPlan, 
allSplits, query);
+
+        return splits.toArray(new InputSplit[splits.size()]);
+    }
+
+    private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
+                                            final List<KeyRange> splits, 
String query) throws
+            IOException {
+        Preconditions.checkNotNull(qplan);
+        Preconditions.checkNotNull(splits);
+        final List<InputSplit> psplits = 
Lists.newArrayListWithExpectedSize(splits.size());
+
+        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
+                .newJobContext(new Job(jobConf)));
+        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+                false);
+        int scanCacheSize = 
jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generating splits with scanCacheSize : " + 
scanCacheSize);
+        }
+        String tableName = qplan
+                .getTableRef().getTable().getPhysicalName().toString();
+        HTable table = new HTable(jobConf, tableName);
+        // Adding Localization
+        HConnection connection = HConnectionManager.createConnection(jobConf);
+        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table);
+
+        for (List<Scan> scans : qplan.getScans()) {
+            PhoenixInputSplit inputSplit;
+
+            HRegionLocation location =
+            
connection.getRegionLocation(TableName.valueOf(tableName),scans.get(0)
+                    .getStartRow(), false);
+            long regionSize = 
sizeCalculator.getRegionSize(location.getRegionInfo().getRegionName
+                    ());
+            String regionLocation = 
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+
+            if (splitByStats) {
+                for (Scan aScan : scans) {
+                    if (scanCacheSize > 0) {
+                        aScan.setCaching(scanCacheSize);
+                    }
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
+                                .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
+                                aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan
+                                .getBatch() + "] and  regionLocation : " + 
regionLocation);
+                    }
+
+                    inputSplit = new 
PhoenixInputSplit(Lists.newArrayList(aScan), tablePaths[0],
+                            regionLocation, regionSize);
+                    inputSplit.setQuery(query);
+                    psplits.add(inputSplit);
+                }
+            } else {
+                if (scanCacheSize > 0) {
+                    for (Scan aScan : scans) {
+                        aScan.setCaching(scanCacheSize);
+                    }
+                }
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes.toStringBinary(scans
+                            .get(0).getStartRow()) + " ~ " + 
Bytes.toStringBinary(scans.get(scans
+                            .size() - 1).getStopRow()));
+                    LOG.debug("First scan : " + scans.get(0) + "with 
scanAttribute : " + scans
+                            .get(0).getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : " +
+                            "[" + scans.get(0).getCaching() + ", " + 
scans.get(0).getCacheBlocks()
+                            + ", " + scans.get(0).getBatch() + "] and  
regionLocation : " +
+                            regionLocation);
+
+                    for (int i = 0, limit = scans.size(); i < limit; i++) {
+                        LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + 
Bytes
+                                .toStringBinary(scans.get(i).getAttribute
+                                        
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+                    }
+                }
+
+                inputSplit = new PhoenixInputSplit(scans, tablePaths[0], 
regionLocation,
+                        regionSize);
+                inputSplit.setQuery(query);
+                psplits.add(inputSplit);
+            }
+        }
+
+        return psplits;
+    }
+
+    @Override
+    public RecordReader<WritableComparable, T> getRecordReader(InputSplit 
split, JobConf job,
+                                                               Reporter 
reporter) throws
+            IOException {
+        final QueryPlan queryPlan = getQueryPlan(job, ((PhoenixInputSplit) 
split).getQuery());
+        @SuppressWarnings("unchecked")
+        final Class<T> inputClass = (Class<T>) 
job.getClass(PhoenixConfigurationUtil.INPUT_CLASS,
+                PhoenixResultWritable.class);
+
+        PhoenixRecordReader<T> recordReader = new 
PhoenixRecordReader<T>(inputClass, job,
+                queryPlan);
+        recordReader.initialize(split);
+
+        return recordReader;
+    }
+
+    /**
+     * Returns the query plan associated with the select query.
+     */
+    private QueryPlan getQueryPlan(final Configuration configuration, String 
selectStatement)
+            throws IOException {
+        try {
+            final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil
+                    .CURRENT_SCN_VALUE);
+            final Properties overridingProps = new Properties();
+            if (currentScnValue != null) {
+                overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
+            }
+            final Connection connection = 
PhoenixConnectionUtil.getInputConnection(configuration,
+                    overridingProps);
+            Preconditions.checkNotNull(selectStatement);
+            final Statement statement = connection.createStatement();
+            final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Compiled query : " + selectStatement);
+            }
+
+            // Optimize the query plan so that we potentially use secondary 
indexes
+            final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            // Initialize the query plan so it sets up the parallel scans
+            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+            return queryPlan;
+        } catch (Exception exception) {
+            LOG.error(String.format("Failed to get the query plan with error 
[%s]", exception.getMessage()));
+            throw new RuntimeException(exception);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java
new file mode 100644
index 0000000..d76e863
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputSplit.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * InputSplit implementation. Represents the data to be processed by an 
individual Mapper
+ */
+public class PhoenixInputSplit extends FileSplit implements InputSplit {
+
+    private List<Scan> scans;
+    private KeyRange keyRange;
+
+    private long regionSize;
+
+    //  query is in the  split because it is not delivered in jobConf.
+    private String query;
+
+    public PhoenixInputSplit() {
+    }
+
+    public PhoenixInputSplit(final List<Scan> scans, Path dummyPath, String 
regionLocation, long
+            length) {
+        super(dummyPath, 0, 0, new String[]{regionLocation});
+
+        regionSize = length;
+
+        Preconditions.checkNotNull(scans);
+        Preconditions.checkState(!scans.isEmpty());
+        this.scans = scans;
+        init();
+    }
+
+    public List<Scan> getScans() {
+        return scans;
+    }
+
+    public KeyRange getKeyRange() {
+        return keyRange;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+
+    public void setQuery(String query) {
+        this.query = query;
+    }
+
+    private void init() {
+        this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), 
scans.get(scans.size() -
+                1).getStopRow());
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
+
+        Preconditions.checkNotNull(scans);
+        WritableUtils.writeVInt(out, scans.size());
+        for (Scan scan : scans) {
+            ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
+            byte[] protoScanBytes = protoScan.toByteArray();
+            WritableUtils.writeVInt(out, protoScanBytes.length);
+            out.write(protoScanBytes);
+        }
+
+        WritableUtils.writeString(out, query);
+        WritableUtils.writeVLong(out, regionSize);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+
+        int count = WritableUtils.readVInt(in);
+        scans = Lists.newArrayListWithExpectedSize(count);
+        for (int i = 0; i < count; i++) {
+            byte[] protoScanBytes = new byte[WritableUtils.readVInt(in)];
+            in.readFully(protoScanBytes);
+            ClientProtos.Scan protoScan = 
ClientProtos.Scan.parseFrom(protoScanBytes);
+            Scan scan = ProtobufUtil.toScan(protoScan);
+            scans.add(scan);
+        }
+        init();
+
+        query = WritableUtils.readString(in);
+        regionSize = WritableUtils.readVLong(in);
+    }
+
+    @Override
+    public long getLength() {
+        return regionSize;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        return new String[]{};
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + keyRange.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof PhoenixInputSplit)) {
+            return false;
+        }
+        PhoenixInputSplit other = (PhoenixInputSplit) obj;
+        if (keyRange == null) {
+            if (other.keyRange != null) {
+                return false;
+            }
+        } else if (!keyRange.equals(other.keyRange)) {
+            return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java
new file mode 100644
index 0000000..ed47176
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixOutputFormat.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.Progressable;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Custom OutputFormat to feed into Hive. Describes the output-specification 
for a Map-Reduce job.
+ */
+public class PhoenixOutputFormat<T extends DBWritable> implements 
OutputFormat<NullWritable, T>,
+        AcidOutputFormat<NullWritable, T> {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixOutputFormat.class);
+
+    public PhoenixOutputFormat() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("PhoenixOutputFormat created");
+        }
+    }
+
+    @Override
+    public RecordWriter<NullWritable, T> getRecordWriter(FileSystem ignored, 
JobConf jobConf,
+                                                         String name, 
Progressable progress)
+            throws IOException {
+        return createRecordWriter(jobConf, new Properties());
+    }
+
+    @Override
+    public void checkOutputSpecs(FileSystem ignored, JobConf job) throws 
IOException {
+
+    }
+
+    @Override
+    public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter
+            (JobConf jobConf, Path finalOutPath, Class<? extends Writable> 
valueClass, boolean
+                    isCompressed, Properties tableProperties, Progressable 
progress) throws
+            IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Get RecordWriter for finalOutPath : " + finalOutPath + 
", valueClass" +
+                    " : " +
+                    valueClass
+                            .getName() + ", isCompressed : " + isCompressed + 
", tableProperties " +
+                    ": " + tableProperties + ", progress : " + progress);
+        }
+
+        return createRecordWriter(jobConf, new Properties());
+    }
+
+    @Override
+    public RecordUpdater getRecordUpdater(Path path, 
org.apache.hadoop.hive.ql.io
+            .AcidOutputFormat.Options options) throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Get RecordWriter for  path : " + path + ", options : " +
+                    PhoenixStorageHandlerUtil
+                            .getOptionsValue(options));
+        }
+        return new PhoenixRecordWriter<T>(path, options);
+    }
+
+    @Override
+    public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getRawRecordWriter(Path path,
+            org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options options) 
throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Get RawRecordWriter for path : " + path + ", options : 
" +
+                    PhoenixStorageHandlerUtil.getOptionsValue(options));
+        }
+
+        return new PhoenixRecordWriter<T>(path, options);
+    }
+
+    private PhoenixRecordWriter<T> createRecordWriter(Configuration config, 
Properties properties) {
+        try {
+            return new PhoenixRecordWriter<T>(config, properties);
+        } catch (SQLException e) {
+            LOG.error("Error during PhoenixRecordWriter instantiation :" + 
e.getMessage());
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/521b8192/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
new file mode 100644
index 0000000..72719a1
--- /dev/null
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive.mapreduce;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hive.PhoenixRowKey;
+import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.RoundRobinResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.monitoring.ReadMetricQueue;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
+
+/**
+ * @RecordReader implementation that iterates over the the records.
+ */
+@SuppressWarnings("rawtypes")
+public class PhoenixRecordReader<T extends DBWritable> implements
+        RecordReader<WritableComparable, T> {
+
+    private static final Log LOG = 
LogFactory.getLog(PhoenixRecordReader.class);
+
+    private final Configuration configuration;
+    private final QueryPlan queryPlan;
+    private WritableComparable key;
+    private T value = null;
+    private Class<T> inputClass;
+    private ResultIterator resultIterator = null;
+    private PhoenixResultSet resultSet;
+    private long readCount;
+
+    private boolean isTransactional;
+
+    public PhoenixRecordReader(Class<T> inputClass, final Configuration 
configuration, final
+    QueryPlan queryPlan) throws IOException {
+        this.inputClass = inputClass;
+        this.configuration = configuration;
+        this.queryPlan = queryPlan;
+
+        isTransactional = 
PhoenixStorageHandlerUtil.isTransactionalTable(configuration);
+    }
+
+    public void initialize(InputSplit split) throws IOException {
+        final PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+        final List<Scan> scans = pSplit.getScans();
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Target table : " + 
queryPlan.getTableRef().getTable().getPhysicalName());
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes.toStringBinary(scans.get(0)
+                    .getStartRow()) + " ~ " + 
Bytes.toStringBinary(scans.get(scans.size() - 1)
+                    .getStopRow()));
+            LOG.debug("First scan : " + scans.get(0) + " scanAttribute : " + 
scans.get(0)
+                    .getAttributesMap());
+
+            for (int i = 0, limit = scans.size(); i < limit; i++) {
+                LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " +
+                        
Bytes.toStringBinary(scans.get(i).getAttribute(BaseScannerRegionObserver
+                                .EXPECTED_UPPER_REGION_KEY)));
+            }
+        }
+
+        try {
+            List<PeekingResultIterator> iterators = 
Lists.newArrayListWithExpectedSize(scans.size
+                    ());
+            StatementContext ctx = queryPlan.getContext();
+            ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+            String tableName = 
queryPlan.getTableRef().getTable().getPhysicalName().getString();
+            long renewScannerLeaseThreshold = 
queryPlan.getContext().getConnection()
+                    .getQueryServices().getRenewLeaseThresholdMilliSeconds();
+            for (Scan scan : scans) {
+                
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes
+                        .toBytes(true));
+                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan
+                        .getContext().getConnection().getMutationState(),
+                        queryPlan.getTableRef(), scan, 
readMetrics.allotMetric(SCAN_BYTES,
+                        tableName), renewScannerLeaseThreshold);
+
+
+                PeekingResultIterator peekingResultIterator = 
LookAheadResultIterator.wrap
+                        (tableResultIterator);
+                iterators.add(peekingResultIterator);
+            }
+            ResultIterator iterator = queryPlan.useRoundRobinIterator()
+                    ? RoundRobinResultIterator.newIterator(iterators, 
queryPlan)
+                    : ConcatResultIterator.newIterator(iterators);
+            if (queryPlan.getContext().getSequenceManager().getSequenceCount() 
> 0) {
+                iterator = new SequenceResultIterator(iterator, 
queryPlan.getContext()
+                        .getSequenceManager());
+            }
+            this.resultIterator = iterator;
+            // Clone the row projector as it's not thread safe and would be 
used
+            // simultaneously by multiple threads otherwise.
+            this.resultSet = new PhoenixResultSet(this.resultIterator, 
queryPlan.getProjector()
+                    .cloneIfNecessary(),
+                    queryPlan.getContext());
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] initializing 
PhoenixRecordReader. ", e
+                    .getMessage()));
+            Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public boolean next(WritableComparable key, T value) throws IOException {
+        try {
+            if (!resultSet.next()) {
+                return false;
+            }
+            value.readFields(resultSet);
+
+            if (isTransactional) {
+                ((PhoenixResultWritable) value).readPrimaryKey((PhoenixRowKey) 
key);
+            }
+
+            ++readCount;
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Result[" + readCount + "] : " + 
((PhoenixResultWritable) value)
+                        .getResultMap());
+            }
+
+            return true;
+        } catch (SQLException e) {
+            LOG.error(String.format(" Error [%s] occurred while iterating over 
the resultset. ",
+                    e.getMessage()));
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public WritableComparable createKey() {
+        if (isTransactional) {
+            key = new PhoenixRowKey();
+        } else {
+            key = NullWritable.get();
+        }
+
+        return key;
+    }
+
+    @Override
+    public T createValue() {
+        value = ReflectionUtils.newInstance(inputClass, this.configuration);
+        return value;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Read Count : " + readCount);
+        }
+
+        if (resultIterator != null) {
+            try {
+                resultIterator.close();
+            } catch (SQLException e) {
+                LOG.error(" Error closing resultset.");
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+        return 0;
+    }
+}

Reply via email to