http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bfaf183/phoenix-spark/src/main/java/org/apache/phoenix/spark/SparkResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/main/java/org/apache/phoenix/spark/SparkResultSet.java 
b/phoenix-spark/src/main/java/org/apache/phoenix/spark/SparkResultSet.java
new file mode 100644
index 0000000..0cb8009
--- /dev/null
+++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/SparkResultSet.java
@@ -0,0 +1,1056 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.spark;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.spark.sql.Row;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Statement;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper class to convert a List of Rows returned from a dataset to a sql 
ResultSet
+ */
+public class SparkResultSet implements ResultSet, SQLCloseable {
+
+    private int index = -1;
+    private List<Row> dataSetRows;
+    private List<String> columnNames;
+    private boolean wasNull = false;
+
+    public SparkResultSet(List<Row> rows, String[] columnNames) {
+        this.dataSetRows = rows;
+        this.columnNames = Arrays.asList(columnNames);
+    }
+
+    private Row getCurrentRow() {
+        return dataSetRows.get(index);
+    }
+
+    @Override
+    public boolean absolute(int row) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void afterLast() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void beforeFirst() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void cancelRowUpdates() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+    }
+
+    @Override
+    public void close() throws SQLException {
+    }
+
+    @Override
+    public void deleteRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int findColumn(String columnLabel) throws SQLException {
+        return columnNames.indexOf(columnLabel.toUpperCase())+1;
+    }
+
+    @Override
+    public boolean first() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Array getArray(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Array getArray(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public InputStream getAsciiStream(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public InputStream getAsciiStream(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    private void checkOpen() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    private void checkCursorState() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(int columnIndex, int scale) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public BigDecimal getBigDecimal(String columnLabel, int scale) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public InputStream getBinaryStream(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public InputStream getBinaryStream(String columnLabel) throws SQLException 
{
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Blob getBlob(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Blob getBlob(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean getBoolean(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? false : getCurrentRow().getBoolean(columnIndex-1);
+    }
+
+    @Override
+    public boolean getBoolean(String columnLabel) throws SQLException {
+        return getBoolean(findColumn(columnLabel));
+    }
+
+    @Override
+    public byte[] getBytes(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public byte[] getBytes(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public byte getByte(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? 0 : getCurrentRow().getByte(columnIndex-1);
+    }
+
+    @Override
+    public byte getByte(String columnLabel) throws SQLException {
+        return getByte(findColumn(columnLabel));
+    }
+
+    @Override
+    public Reader getCharacterStream(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Reader getCharacterStream(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Clob getClob(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Clob getClob(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int getConcurrency() throws SQLException {
+        return ResultSet.CONCUR_READ_ONLY;
+    }
+
+    @Override
+    public String getCursorName() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Date getDate(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return getCurrentRow().getDate(columnIndex-1);
+    }
+
+    @Override
+    public Date getDate(String columnLabel) throws SQLException {
+        return getDate(findColumn(columnLabel));
+    }
+
+    @Override
+    public Date getDate(int columnIndex, Calendar cal) throws SQLException {
+        cal.setTime(getCurrentRow().getDate(columnIndex-1));
+        return new Date(cal.getTimeInMillis());
+    }
+
+    @Override
+    public Date getDate(String columnLabel, Calendar cal) throws SQLException {
+        return getDate(findColumn(columnLabel), cal);
+    }
+
+    @Override
+    public double getDouble(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? 0 : getCurrentRow().getDouble(columnIndex-1);
+    }
+
+    @Override
+    public double getDouble(String columnLabel) throws SQLException {
+        return getDouble(findColumn(columnLabel));
+    }
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        return ResultSet.FETCH_FORWARD;
+    }
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public float getFloat(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? 0 : getCurrentRow().getFloat(columnIndex-1);
+    }
+
+    @Override
+    public float getFloat(String columnLabel) throws SQLException {
+        return getFloat(findColumn(columnLabel));
+    }
+
+    @Override
+    public int getHoldability() throws SQLException {
+        return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+    }
+
+    @Override
+    public int getInt(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? 0 :  getCurrentRow().getInt(columnIndex-1);
+    }
+
+    @Override
+    public int getInt(String columnLabel) throws SQLException {
+        return getInt(findColumn(columnLabel));
+    }
+
+    @Override
+    public long getLong(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? 0 :  getCurrentRow().getLong(columnIndex-1);
+    }
+
+    @Override
+    public long getLong(String columnLabel) throws SQLException {
+        return getLong(findColumn(columnLabel));
+    }
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Reader getNCharacterStream(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Reader getNCharacterStream(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public NClob getNClob(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public NClob getNClob(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public String getNString(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public String getNString(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Object getObject(int columnIndex) throws SQLException {
+        return getCurrentRow().get(columnIndex-1);
+    }
+
+    @Override
+    public Object getObject(String columnLabel) throws SQLException {
+        return getObject(findColumn(columnLabel));
+    }
+
+    @Override
+    public Object getObject(int columnIndex, Map<String, Class<?>> map) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Object getObject(String columnLabel, Map<String, Class<?>> map) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Ref getRef(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Ref getRef(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int getRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public RowId getRowId(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public RowId getRowId(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public SQLXML getSQLXML(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public SQLXML getSQLXML(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public short getShort(int columnIndex) throws SQLException {
+        return getCurrentRow().getShort(columnIndex-1);
+    }
+
+    @Override
+    public short getShort(String columnLabel) throws SQLException {
+        return getShort(findColumn(columnLabel));
+    }
+
+    @Override
+    public Statement getStatement() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public String getString(int columnIndex) throws SQLException {
+        wasNull = getCurrentRow().isNullAt(columnIndex-1);
+        return wasNull ? null : getCurrentRow().getString(columnIndex-1);
+    }
+
+    @Override
+    public String getString(String columnLabel) throws SQLException {
+        return getString(findColumn(columnLabel));
+    }
+
+    @Override
+    public Time getTime(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Time getTime(String columnLabel) throws SQLException {
+        return getTime(findColumn(columnLabel));
+    }
+
+    @Override
+    public Time getTime(int columnIndex, Calendar cal) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public Time getTime(String columnLabel, Calendar cal) throws SQLException {
+        return getTime(findColumn(columnLabel),cal);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnIndex) throws SQLException {
+        return getCurrentRow().getTimestamp(columnIndex-1);
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel) throws SQLException {
+        return getTimestamp(findColumn(columnLabel));
+    }
+
+    @Override
+    public Timestamp getTimestamp(int columnIndex, Calendar cal) throws 
SQLException {
+        return getTimestamp(columnIndex-1);
+    }
+
+    @Override
+    public Timestamp getTimestamp(String columnLabel, Calendar cal) throws 
SQLException {
+        return getTimestamp(findColumn(columnLabel),cal);
+    }
+
+    @Override
+    public int getType() throws SQLException {
+        return ResultSet.TYPE_FORWARD_ONLY;
+    }
+
+    @Override
+    public URL getURL(int columnIndex) throws SQLException {
+        try {
+            return new URL(getCurrentRow().getString(columnIndex-1));
+        } catch (MalformedURLException e) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.MALFORMED_URL).setRootCause(e)
+                    .build().buildException();
+        }
+    }
+
+    @Override
+    public URL getURL(String columnLabel) throws SQLException {
+        return getURL(findColumn(columnLabel));
+    }
+
+    @Override
+    public InputStream getUnicodeStream(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public InputStream getUnicodeStream(String columnLabel) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public void insertRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isAfterLast() throws SQLException {
+        return index >= dataSetRows.size();
+    }
+
+    @Override
+    public boolean isBeforeFirst() throws SQLException {
+        return index == -1;
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isFirst() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isLast() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean last() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void moveToCurrentRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void moveToInsertRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean next() throws SQLException {
+        index++;
+        return index < dataSetRows.size();
+    }
+
+    @Override
+    public boolean previous() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void refreshRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean relative(int rows) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean rowDeleted() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean rowInserted() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean rowUpdated() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {
+        if (direction != ResultSet.FETCH_FORWARD) {
+            throw new SQLFeatureNotSupportedException();
+        }
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateArray(int columnIndex, Array x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateArray(String columnLabel, Array x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, int length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, int 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(int columnIndex, InputStream x, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateAsciiStream(String columnLabel, InputStream x, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBigDecimal(int columnIndex, BigDecimal x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBigDecimal(String columnLabel, BigDecimal x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, int length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, int 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(int columnIndex, InputStream x, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBinaryStream(String columnLabel, InputStream x, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, Blob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, Blob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(int columnIndex, InputStream inputStream, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBlob(String columnLabel, InputStream inputStream, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBoolean(int columnIndex, boolean x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBoolean(String columnLabel, boolean x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateByte(int columnIndex, byte x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateByte(String columnLabel, byte x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBytes(int columnIndex, byte[] x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateBytes(String columnLabel, byte[] x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, int length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, int 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(int columnIndex, Reader x, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateCharacterStream(String columnLabel, Reader reader, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Clob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Clob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader) throws SQLException 
{
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(int columnIndex, Reader reader, long length) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateClob(String columnLabel, Reader reader, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateDate(int columnIndex, Date x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateDate(String columnLabel, Date x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateDouble(int columnIndex, double x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateDouble(String columnLabel, double x) throws SQLException 
{
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateFloat(int columnIndex, float x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateFloat(String columnLabel, float x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateInt(int columnIndex, int x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateInt(String columnLabel, int x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateLong(int columnIndex, long x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateLong(String columnLabel, long x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNCharacterStream(int columnIndex, Reader x, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNCharacterStream(String columnLabel, Reader reader, long 
length) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, NClob nClob) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, NClob nClob) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(int columnIndex, Reader reader, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNClob(String columnLabel, Reader reader, long length) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNString(int columnIndex, String nString) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNString(String columnLabel, String nString) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNull(int columnIndex) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateNull(String columnLabel) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x) throws SQLException 
{
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateObject(int columnIndex, Object x, int scaleOrLength) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateObject(String columnLabel, Object x, int scaleOrLength) 
throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateRef(int columnIndex, Ref x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateRef(String columnLabel, Ref x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateRow() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateRowId(int columnIndex, RowId x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateRowId(String columnLabel, RowId x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateShort(int columnIndex, short x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateShort(String columnLabel, short x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateString(int columnIndex, String x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateString(String columnLabel, String x) throws SQLException 
{
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateTime(int columnIndex, Time x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateTime(String columnLabel, Time x) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateTimestamp(int columnIndex, Timestamp x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void updateTimestamp(String columnLabel, Timestamp x) throws 
SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean wasNull() throws SQLException {
+        return wasNull;
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return iface.isInstance(this);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!iface.isInstance(this)) {
+            throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                    .setMessage(this.getClass().getName() + " not unwrappable 
from " + iface.getName())
+                    .build().buildException();
+        }
+        return (T)this;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getObject(int columnIndex, Class<T> type) throws SQLException 
{
+        return (T) getObject(columnIndex-1); // Just ignore type since we only 
support built-in types
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T getObject(String columnLabel, Class<T> type) throws 
SQLException {
+        return (T) getObject(columnLabel); // Just ignore type since we only 
support built-in types
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bfaf183/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
----------------------------------------------------------------------
diff --git 
a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala 
b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
index 2c2c6e1..d604e0e 100644
--- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
+++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala
@@ -21,8 +21,9 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.phoenix.jdbc.PhoenixDriver
 import org.apache.phoenix.mapreduce.PhoenixInputFormat
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil
+import org.apache.phoenix.query.QueryConstants
 import org.apache.phoenix.schema.types._
-import org.apache.phoenix.util.ColumnInfo
+import org.apache.phoenix.util.{ColumnInfo, SchemaUtil}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
@@ -148,13 +149,25 @@ class PhoenixRDD(sc: SparkContext, table: String, 
columns: Seq[String],
     }), new StructType(structFields))
   }
 
-  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = {
-    columnList.map(ci => {
-      val structType = phoenixTypeToCatalystType(ci)
-      StructField(ci.getDisplayName, structType)
-    })
+  def normalizeColumnName(columnName: String) = {
+    val unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName)
+    var normalizedColumnName = ""
+    if (unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR) < 0) {
+      normalizedColumnName = unescapedColumnName
+    }
+    else {
+      // split by separator to get the column family and column name
+      val tokens = 
unescapedColumnName.split(QueryConstants.NAME_SEPARATOR_REGEX)
+      normalizedColumnName = if (tokens(0) == "0") tokens(1) else 
unescapedColumnName
+    }
+    normalizedColumnName
   }
 
+  def phoenixSchemaToCatalystSchema(columnList: Seq[ColumnInfo]) = 
columnList.map(ci => {
+    val structType = phoenixTypeToCatalystType(ci)
+    StructField(normalizeColumnName(ci.getColumnName), structType)
+  })
+
 
   // Lookup table for Phoenix types to Spark catalyst types
   def phoenixTypeToCatalystType(columnInfo: ColumnInfo): DataType = 
columnInfo.getPDataType match {
@@ -166,7 +179,7 @@ class PhoenixRDD(sc: SparkContext, table: String, columns: 
Seq[String],
     case t if t.isInstanceOf[PFloat] || t.isInstanceOf[PUnsignedFloat] => 
FloatType
     case t if t.isInstanceOf[PDouble] || t.isInstanceOf[PUnsignedDouble] => 
DoubleType
     // Use Spark system default precision for now (explicit to work with < 1.5)
-    case t if t.isInstanceOf[PDecimal] => 
+    case t if t.isInstanceOf[PDecimal] =>
       if (columnInfo.getPrecision == null || columnInfo.getPrecision < 0) 
DecimalType(38, 18) else DecimalType(columnInfo.getPrecision, 
columnInfo.getScale)
     case t if t.isInstanceOf[PTimestamp] || t.isInstanceOf[PUnsignedTimestamp] 
=> TimestampType
     case t if t.isInstanceOf[PTime] || t.isInstanceOf[PUnsignedTime] => 
TimestampType

http://git-wip-us.apache.org/repos/asf/phoenix/blob/9bfaf183/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 143f285..eba69a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,7 +101,7 @@
     <avatica.version>1.12.0</avatica.version>
     <jettyVersion>8.1.7.v20120910</jettyVersion>
     <tephra.version>0.15.0-incubating</tephra.version>
-    <spark.version>2.0.2</spark.version>
+    <spark.version>2.3.2</spark.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
     <stream.version>2.9.5</stream.version>

Reply via email to