[SYSTEMML-914] Rework dataframe-frame, frame-dataframe converters

Similar to the rework of dataframe-matrix converters, this patch fixes
various correctness and performance issues of the dataframe-frame
converters. This includes consistent rowID column handling, consistent
APIs, a fix of dimension analysis with existing ID column, exploitation
of existing row IDs, avoided unnecessary dimension analysis on unknown
nnzs, efficient schema handling, and a more efficient parsing of frame
inputs by exploiting matching value types. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/df090f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/df090f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/df090f2b

Branch: refs/heads/master
Commit: df090f2b1f9dd23a6fd48c5a67a95e1eb8e3ba59
Parents: ed07284
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Fri Sep 16 01:35:54 2016 +0200
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Fri Sep 16 06:33:32 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLOutput.java     |  19 +-
 .../api/mlcontext/MLContextConversionUtil.java  |   5 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 218 +++++++++-------
 .../spark/utils/RDDConverterUtils.java          |   6 +-
 .../sysml/runtime/util/UtilFunctions.java       |   7 +-
 .../functions/frame/FrameConverterTest.java     |  13 +-
 .../mlcontext/DataFrameConversionTest.java      | 196 ---------------
 .../mlcontext/DataFrameFrameConversionTest.java | 246 +++++++++++++++++++
 .../DataFrameMatrixConversionTest.java          | 196 +++++++++++++++
 .../functions/mlcontext/FrameTest.java          |  12 +-
 .../mlcontext/MLContextFrameTest.java           |  35 +--
 11 files changed, 623 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index ec2f24d..d011104 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -47,18 +47,17 @@ public class MLOutput {
        Map<String, JavaPairRDD<?,?>> _outputs;
        private Map<String, MatrixCharacteristics> _outMetadata = null;
        
+       public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, 
MatrixCharacteristics> outMetadata) {
+               this._outputs = outputs;
+               this._outMetadata = outMetadata;
+       }
+       
        public MatrixBlock getMatrixBlock(String varName) throws 
DMLRuntimeException {
                MatrixCharacteristics mc = getMatrixCharacteristics(varName);
                // The matrix block is always pushed to an RDD and then we do 
collect
                // We can later avoid this by returning symbol table rather 
than "Map<String, JavaPairRDD<MatrixIndexes,MatrixBlock>> _outputs"
-               MatrixBlock mb = 
SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) 
mc.getRows(), (int) mc.getCols(), 
+               return 
SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) 
mc.getRows(), (int) mc.getCols(), 
                                mc.getRowsPerBlock(), mc.getColsPerBlock(), 
mc.getNonZeros());
-               return mb;
-       }
-
-       public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, 
MatrixCharacteristics> outMetadata) {
-               this._outputs = outputs;
-               this._outMetadata = outMetadata;
        }
        
        @SuppressWarnings("unchecked")
@@ -160,6 +159,8 @@ public class MLOutput {
        }
        
        public JavaRDD<String> getStringFrameRDD(String varName, String format, 
CSVFileFormatProperties fprop ) throws DMLRuntimeException {
+               //TODO MB: note that on construction of MLOutput only matrix 
binary blocks are passed, and 
+               //hence we will never find a frame binary block in the outputs.
                JavaPairRDD<Long, FrameBlock> binaryRDD = 
getFrameBinaryBlockedRDD(varName);
                MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); 
                if(format.equals("csv")) {
@@ -175,9 +176,11 @@ public class MLOutput {
        }
        
        public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) 
throws DMLRuntimeException {
+               //TODO MB: note that on construction of MLOutput only matrix 
binary blocks are passed, and 
+               //hence we will never find a frame binary block in the outputs.
                JavaPairRDD<Long, FrameBlock> binaryRDD = 
getFrameBinaryBlockedRDD(varName);
                MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
-               return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryRDD, 
mcIn, jsc);
+               return FrameRDDConverterUtils.binaryBlockToDataFrame(new 
SQLContext(jsc), binaryRDD, mcIn, null);
        }
        
        public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String 
varName) throws DMLRuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 5476902..e74dc53 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -1217,11 +1217,10 @@ public class MLContextConversionUtil {
                        @SuppressWarnings("unchecked")
                        JavaPairRDD<Long, FrameBlock> binaryBlockFrame = 
(JavaPairRDD<Long, FrameBlock>) sparkExecutionContext
                                        
.getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo);
-                       MatrixCharacteristics matrixCharacteristics = 
frameObject.getMatrixCharacteristics();
+                       MatrixCharacteristics mc = 
frameObject.getMatrixCharacteristics();
 
                        JavaSparkContext jsc = 
MLContextUtil.getJavaSparkContext((MLContext) 
MLContextProxy.getActiveMLContextForAPI());
-
-                       return 
FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, 
matrixCharacteristics, jsc);
+                       return 
FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), 
binaryBlockFrame, mc, frameObject.getSchema());
                } 
                catch (DMLRuntimeException e) {
                        throw new MLContextException("DMLRuntimeException while 
converting frame object to DataFrame", e);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 211f814..faf8ba1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -42,6 +42,7 @@ import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -54,6 +55,7 @@ import 
org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
 import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines;
+import 
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -326,27 +328,29 @@ public class FrameRDDConverterUtils
         * @throws DMLRuntimeException
         */
        public static JavaPairRDD<Long, FrameBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) 
+                       DataFrame df, MatrixCharacteristics mc, boolean 
containsID) 
                throws DMLRuntimeException 
        {
-               
-               if(containsID)
-                       df = df.drop(RDDConverterUtils.DF_ID_COLUMN);
-               
                //determine unknown dimensions if required
-               if( !mcOut.dimsKnown(true) ) {
+               if( !mc.dimsKnown() ) { //nnz are irrelevant here
                        JavaRDD<Row> tmp = df.javaRDD();
                        long rlen = tmp.count();
-                       long clen = containsID ? (df.columns().length - 1) : 
df.columns().length;
-                       mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), -1);
+                       long clen = df.columns().length - (containsID?1:0);
+                       mc.set(rlen, clen, mc.getRowsPerBlock(), 
mc.getColsPerBlock(), -1);
                }
                
-               JavaPairRDD<Row, Long> prepinput = df.javaRDD()
-                               .zipWithIndex(); //zip row index
-               
+               JavaPairRDD<Row, Long> prepinput = containsID ?
+                               df.javaRDD().mapToPair(new 
DataFrameExtractIDFunction()) :
+                               df.javaRDD().zipWithIndex(); //zip row index
+
+               //convert data frame to frame schema (prepare once)
+               List<String> colnames = new ArrayList<String>();
+               List<ValueType> fschema = new ArrayList<ValueType>();
+               convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, 
containsID);
+                               
                //convert rdd to binary block rdd
-               JavaPairRDD<Long, FrameBlock> out = prepinput
-                               .mapPartitionsToPair(new 
DataFrameToBinaryBlockFunction(mcOut));
+               JavaPairRDD<Long, FrameBlock> out = 
prepinput.mapPartitionsToPair(
+                               new DataFrameToBinaryBlockFunction(mc, 
colnames, fschema, containsID));
                
                return out;
        }
@@ -359,22 +363,27 @@ public class FrameRDDConverterUtils
         * @param strict
         * @return
         */
-       public static DataFrame 
binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics 
mcIn, JavaSparkContext sc)
+       public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, 
JavaPairRDD<Long,FrameBlock> in, 
+                       MatrixCharacteristics mc, List<ValueType> schema)
        {
-               List<ValueType> schema = in.first()._2().getSchema();
+               if( !mc.colsKnown() )
+                       throw new RuntimeException("Number of columns needed to 
convert binary block to data frame.");
                
-               //convert binary block to rows rdd (from blocks/rows)
-               JavaRDD<Row> rowRDD = in.flatMap(new 
BinaryBlockToDataFrameFunction());
+               //convert binary block to rows rdd 
+               JavaRDD<Row> rowRDD = in.flatMap(
+                               new BinaryBlockToDataFrameFunction());
                                
-               SQLContext sqlContext = new SQLContext(sc);
-               StructType dfSchema = convertFrameSchemaToDFSchema(schema);
-               DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
+               //create data frame schema
+               if( schema == null )
+                       schema = Collections.nCopies((int)mc.getCols(), 
ValueType.STRING);
+               StructType dfSchema = convertFrameSchemaToDFSchema(schema, 
true);
        
-               return df;
+               //rdd to data frame conversion
+               return sqlctx.createDataFrame(rowRDD, dfSchema);
        }
        
        
-       /*
+       /**
         * This function will convert Frame schema into DataFrame schema 
         * 
         *  @param      schema
@@ -382,32 +391,64 @@ public class FrameRDDConverterUtils
         *  @return
         *              Returns the DataFrame schema (StructType)
         */
-       public static StructType convertFrameSchemaToDFSchema(List<ValueType> 
lschema)
+       public static StructType convertFrameSchemaToDFSchema(List<ValueType> 
fschema, boolean containsID)
        {
-               // Generate the schema based on the string of schema
+               // generate the schema based on the string of schema
                List<StructField> fields = new ArrayList<StructField>();
                
-               int i = 1;
-               for (ValueType schema : lschema) {
-                       org.apache.spark.sql.types.DataType dataType = 
DataTypes.StringType;
+               // add id column type
+               if( containsID )
+                       
fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, 
+                                       DataTypes.DoubleType, true));
+               
+               // add remaining types
+               int col = 1;
+               for (ValueType schema : fschema) {
+                       DataType dt = null;
                        switch(schema) {
-                               case STRING:  dataType = DataTypes.StringType; 
break;
-                               case DOUBLE:  dataType = DataTypes.DoubleType; 
break;
-                               case INT:     dataType = DataTypes.LongType; 
break;
-                               case BOOLEAN: dataType = DataTypes.BooleanType; 
break;
-                               default:
+                               case STRING:  dt = DataTypes.StringType; break;
+                               case DOUBLE:  dt = DataTypes.DoubleType; break;
+                               case INT:     dt = DataTypes.LongType; break;
+                               case BOOLEAN: dt = DataTypes.BooleanType; break;
+                               default:      dt = DataTypes.StringType;
                                        LOG.warn("Using default type String for 
" + schema.toString());
                        }
-                       fields.add(DataTypes.createStructField("C"+i++, 
dataType, true));
+                       fields.add(DataTypes.createStructField("C"+col++, dt, 
true));
                }
                
                return DataTypes.createStructType(fields);
        }
        
+       /**
+        * 
+        * @param dfschema
+        * @param containsID
+        * @return
+        */
+       public static void convertDFSchemaToFrameSchema(StructType dfschema, 
List<String> colnames, 
+                       List<ValueType> fschema, boolean containsID)
+       {
+               int off = containsID ? 1 : 0;
+               for( int i=off; i<dfschema.fields().length; i++ ) {
+                       StructField structType = dfschema.apply(i);
+                       colnames.add(structType.name());
+                       if(structType.dataType() == DataTypes.DoubleType 
+                               || structType.dataType() == DataTypes.FloatType)
+                               fschema.add(ValueType.DOUBLE);
+                       else if(structType.dataType() == DataTypes.LongType 
+                               || structType.dataType() == 
DataTypes.IntegerType)
+                               fschema.add(ValueType.INT);
+                       else if(structType.dataType() == DataTypes.BooleanType)
+                               fschema.add(ValueType.BOOLEAN);
+                       else
+                               fschema.add(ValueType.STRING);
+               }
+       }
+       
        /* 
         * It will return JavaRDD<Row> based on csv data input file.
         */
-       public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String 
fnameIn, String delim, List<ValueType> schema)
+       public static JavaRDD<Row> csvToRowRDD(JavaSparkContext sc, String 
fnameIn, String delim, List<ValueType> schema)
        {
                // Load a text file and convert each line to a java rdd.
                JavaRDD<String> dataRdd = sc.textFile(fnameIn);
@@ -695,20 +736,29 @@ public class FrameRDDConverterUtils
                private static final long serialVersionUID = 
2269315691094111843L;
 
                private long _clen = -1;
+               private List<String> _colnames = null;
+               private List<ValueType> _schema = null;
+               private boolean _containsID = false;
                private int _maxRowsPerBlock = -1;
                
-               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) 
{
+               public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, 
List<String> colnames, 
+                               List<ValueType> schema, boolean containsID) {
                        _clen = mc.getCols();
+                       _colnames = colnames;
+                       _schema = schema;
+                       _containsID = containsID;
                        _maxRowsPerBlock = Math.max((int) 
(FrameBlock.BUFFER_SIZE/_clen), 1);
                }
                
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) throws Exception {
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) 
+                       throws Exception 
+               {
                        ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
 
-                       Long[] ix = new Long[1];
-                       FrameBlock[] mb = new FrameBlock[1];
-                       int iRowsInBlock = 0;
+                       long ix = -1;
+                       FrameBlock fb = null;
+                       Object[] tmprow = new Object[(int)_clen];
                        
                        while( arg0.hasNext() )
                        {
@@ -716,55 +766,40 @@ public class FrameRDDConverterUtils
                                Row row = tmp._1();
                                long rowix = tmp._2()+1;
                                
-                               if( iRowsInBlock == 0 || iRowsInBlock == 
_maxRowsPerBlock) {
-                                       if( iRowsInBlock == _maxRowsPerBlock )
-                                               flushBlocksToList(ix, mb, ret);
-                                       createBlocks(rowix, ix, mb, row);
-                                       iRowsInBlock = 0;
+                               if( fb == null || fb.getNumRows() == 
_maxRowsPerBlock) {
+                                       if( fb != null )
+                                               flushBlocksToList(ix, fb, ret);
+                                       ix = rowix;
+                                       fb = new FrameBlock(_schema, _colnames);
                                }
                                
                                //process row data
-                               Object[] parts = rowToObjectArray(row, 
(int)_clen, mb[0].getSchema());
-                               mb[0].appendRow(parts);
-                               iRowsInBlock++;
+                               int off = _containsID ? 1 : 0;
+                               for(int i=off; i<row.size(); i++) {
+                                       tmprow[i-off] = 
UtilFunctions.objectToObject(
+                                                       _schema.get(i-off), 
row.get(i));
+                               }
+                               fb.appendRow(tmprow);
                        }
                
                        //flush last blocks
-                       flushBlocksToList(ix, mb, ret);
+                       flushBlocksToList(ix, fb, ret);
                
                        return ret;
                }
                
-               public Object[] rowToObjectArray(Row row, int _clen, 
List<ValueType> schema) throws Exception {
-                       Object[] ret = new Object[_clen];
-                       for(int i = 0; i < row.length(); i++)
-                               ret[i] = 
UtilFunctions.objectToObject(schema.get(i), row.get(i));
-                       for(int i=row.length(); i<_clen; i++)
-                               ret[i] = "";
-                       return ret;
-               }
-
-               // Creates new state of empty column blocks for current global 
row index.
-               private void createBlocks(long rowix, Long[] ix, FrameBlock[] 
mb, Row row)
-               {
-                       //compute row block index and number of column blocks
-                       ix[0] = new Long(rowix);
-                       
-                       List<String> columns = new ArrayList<String>();
-                       List<ValueType> schema = new ArrayList<ValueType>();
-                       for (StructField structType: row.schema().fields()) {
-                               columns.add(structType.name());
-                               if(structType.dataType() == 
DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType)
-                                       schema.add(ValueType.DOUBLE);
-                               else if(structType.dataType() == 
DataTypes.LongType || structType.dataType() == DataTypes.IntegerType)
-                                       schema.add(ValueType.INT);
-                               else if(structType.dataType() == 
DataTypes.BooleanType)
-                                       schema.add(ValueType.BOOLEAN);
-                               else
-                                       schema.add(ValueType.STRING);
-                       }
-                       mb[0] = new FrameBlock(schema);
-                       mb[0].setColumnNames(columns);
+               /**
+                * 
+                * @param ix
+                * @param fb
+                * @param ret
+                * @throws DMLRuntimeException
+                */
+               private static void flushBlocksToList( Long ix, FrameBlock fb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+                       throws DMLRuntimeException
+               {                       
+                       if( fb != null && fb.getNumRows()>0 )
+                               ret.add(new Tuple2<Long,FrameBlock>(ix, fb));
                }
        }
 
@@ -779,14 +814,21 @@ public class FrameRDDConverterUtils
                public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
+                       long rowIndex = arg0._1();
                        FrameBlock blk = arg0._2();
                        ArrayList<Row> ret = new ArrayList<Row>();
 
                        //handle Frame block data
-                       Iterator<Object[]> iter = blk.getObjectRowIterator();
-                       while( iter.hasNext() )
-                               ret.add(RowFactory.create(iter.next().clone()));
-                               
+                       int rows = blk.getNumRows();
+                       int cols = blk.getNumColumns();
+                       for( int i=0; i<rows; i++ ) {
+                               Object[] row = new Object[cols+1];
+                               row[0] = rowIndex++;
+                               for( int j=0; j<cols; j++ )
+                                       row[j+1] = blk.get(i, j);
+                               ret.add(RowFactory.create(row));
+                       }
+                       
                        return ret;
                }
        }
@@ -1046,13 +1088,11 @@ public class FrameRDDConverterUtils
        // Common functions
        
        // Flushes current state of filled column blocks to output list.
-       private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+       private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
                throws DMLRuntimeException
-       {
-               int len = ix.length;                    
-               for( int i=0; i<len; i++ )
-                       if( mb[i] != null ) {
-                               ret.add(new 
Tuple2<Long,FrameBlock>(ix[i],mb[i]));
-                       }       
+       {                       
+               for( int i=0; i<ix.length; i++ )
+                       if( fb[i] != null && fb[0].getNumRows()>0 )
+                               ret.add(new Tuple2<Long,FrameBlock>(ix[i], 
fb[i]));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index ba1934a..3ee1ef8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -322,7 +322,7 @@ public class RDDConverterUtils
         * @return
         * @throws DMLRuntimeException
         */
-       public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext, 
+       public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, 
                        JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
MatrixCharacteristics mc, boolean toVector)  
        {
                if( !mc.colsKnown() )
@@ -344,7 +344,7 @@ public class RDDConverterUtils
                }
                
                //rdd to data frame conversion
-               return sqlContext.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
+               return sqlctx.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
        }
        
        /**
@@ -1011,7 +1011,7 @@ public class RDDConverterUtils
        /**
         * 
         */
-       private static class DataFrameExtractIDFunction implements 
PairFunction<Row, Row,Long> 
+       protected static class DataFrameExtractIDFunction implements 
PairFunction<Row, Row,Long> 
        {
                private static final long serialVersionUID = 
7438855241666363433L;
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 1ac552f..81a1e8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -423,8 +423,11 @@ public class UtilFunctions
         * @return
         */
        public static Object objectToObject(ValueType vt, Object in ) {
-               String str = objectToString(in);
-               return stringToObject(vt, str );
+               if( in instanceof Double && vt == ValueType.DOUBLE 
+                       || in instanceof Long && vt == ValueType.INT )
+                       return in; //quick path to avoid double parsing
+               else
+                       return stringToObject(vt, objectToString(in) );
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 107dee3..f0c17eb 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase
 
                                //Create DataFrame 
                                SQLContext sqlContext = new SQLContext(sc);
-                               StructType dfSchema = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema);
-                               JavaRDD<Row> rowRDD = 
FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema);
+                               StructType dfSchema = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
+                               JavaRDD<Row> rowRDD = 
FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, schema);
                                DataFrame df = 
sqlContext.createDataFrame(rowRDD, dfSchema);
                                
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
@@ -532,13 +532,14 @@ public class FrameConverterTest extends AutomatedTestBase
                        case BIN2DFRM: {
                                InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
-                               JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
-                               JavaPairRDD<Long, FrameBlock> rddIn2 = 
rddIn.mapToPair(new LongWritableFrameToLongFrameFunction());
-                               DataFrame df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc);
+                               JavaPairRDD<Long, FrameBlock> rddIn = sc
+                                               .hadoopFile(fnameIn, 
iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
+                                               .mapToPair(new 
LongWritableFrameToLongFrameFunction());
+                               DataFrame df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, 
schema);
                                
                                //Convert back DataFrame to binary block for 
comparison using original binary to converted DF and back to binary 
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .dataFrameToBinaryBlock(sc, df, 
mc, false/*, columns*/)
+                                               .dataFrameToBinaryBlock(sc, df, 
mc, true)
                                                .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
deleted file mode 100644
index c19865c..0000000
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.test.integration.functions.mlcontext;
-
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
-import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
-import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
-import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.util.DataConverter;
-import org.apache.sysml.test.integration.AutomatedTestBase;
-import org.apache.sysml.test.integration.TestConfiguration;
-import org.apache.sysml.test.utils.TestUtils;
-
-
-public class DataFrameConversionTest extends AutomatedTestBase 
-{
-       private final static String TEST_DIR = "functions/mlcontext/";
-       private final static String TEST_NAME = "DataFrameConversion";
-       private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameConversionTest.class.getSimpleName() + "/";
-
-       private final static int  rows1 = 2245;
-       private final static int  cols1 = 745;
-       private final static int  cols2 = 1264;
-       private final static double sparsity1 = 0.9;
-       private final static double sparsity2 = 0.1;
-       private final static double eps=0.0000000001;
-
-        
-       @Override
-       public void setUp() {
-               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
-       }
-       
-       @Test
-       public void testVectorConversionSingleDense() {
-               testDataFrameConversion(true, true, true, false);
-       }
-       
-       @Test
-       public void testVectorConversionSingleDenseUnknown() {
-               testDataFrameConversion(true, true, true, true);
-       }
-       
-       @Test
-       public void testVectorConversionSingleSparse() {
-               testDataFrameConversion(true, true, false, false);
-       }
-       
-       @Test
-       public void testVectorConversionSingleSparseUnknown() {
-               testDataFrameConversion(true, true, false, true);
-       }
-       
-       @Test
-       public void testVectorConversionMultiDense() {
-               testDataFrameConversion(true, false, true, false);
-       }
-       
-       @Test
-       public void testVectorConversionMultiDenseUnknown() {
-               testDataFrameConversion(true, false, true, true);
-       }
-       
-       @Test
-       public void testVectorConversionMultiSparse() {
-               testDataFrameConversion(true, false, false, false);
-       }
-       
-       @Test
-       public void testVectorConversionMultiSparseUnknown() {
-               testDataFrameConversion(true, false, false, true);
-       }
-
-       @Test
-       public void testRowConversionSingleDense() {
-               testDataFrameConversion(false, true, true, false);
-       }
-       
-       @Test
-       public void testRowConversionSingleDenseUnknown() {
-               testDataFrameConversion(false, true, true, true);
-       }
-       
-       @Test
-       public void testRowConversionSingleSparse() {
-               testDataFrameConversion(false, true, false, false);
-       }
-       
-       @Test
-       public void testRowConversionSingleSparseUnknown() {
-               testDataFrameConversion(false, true, false, true);
-       }
-       
-       @Test
-       public void testRowConversionMultiDense() {
-               testDataFrameConversion(false, false, true, false);
-       }
-       
-       @Test
-       public void testRowConversionMultiDenseUnknown() {
-               testDataFrameConversion(false, false, true, true);
-       }
-       
-       @Test
-       public void testRowConversionMultiSparse() {
-               testDataFrameConversion(false, false, false, false);
-       }
-       
-       @Test
-       public void testRowConversionMultiSparseUnknown() {
-               testDataFrameConversion(false, false, false, true);
-       }
-       
-       /**
-        * 
-        * @param vector
-        * @param singleColBlock
-        * @param dense
-        * @param unknownDims
-        */
-       private void testDataFrameConversion(boolean vector, boolean 
singleColBlock, boolean dense, boolean unknownDims) {
-               boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
-               RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
-
-               SparkExecutionContext sec = null;
-               
-               try
-               {
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
-                       DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
-                       
-                       //generate input data and setup metadata
-                       int cols = singleColBlock ? cols1 : cols2;
-                       double sparsity = dense ? sparsity1 : sparsity2; 
-                       double[][] A = getRandomMatrix(rows1, cols, -10, 10, 
sparsity, 2373); 
-                       MatrixBlock mbA = 
DataConverter.convertToMatrixBlock(A); 
-                       int blksz = ConfigurationManager.getBlocksize();
-                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
-                       MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
-                       
-                       //setup spark context
-                       sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
-                       JavaSparkContext sc = sec.getSparkContext();
-                       SQLContext sqlctx = new SQLContext(sc);
-                       
-                       //get binary block input rdd
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
-                       
-                       //matrix - dataframe - matrix conversion
-                       DataFrame df = 
RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
-                       
-                       //get output matrix block
-                       MatrixBlock mbB = 
SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
-                       
-                       //compare matrix blocks
-                       double[][] B = DataConverter.convertToDoubleMatrix(mbB);
-                       TestUtils.compareMatrices(A, B, rows1, cols, eps);
-               }
-               catch( Exception ex ) {
-                       throw new RuntimeException(ex);
-               }
-               finally {
-                       sec.close();
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
-                       DMLScript.rtplatform = oldPlatform;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
new file mode 100644
index 0000000..a26cfe8
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameFrameConversionTest extends AutomatedTestBase 
+{
+       private final static String TEST_DIR = "functions/mlcontext/";
+       private final static String TEST_NAME = "DataFrameConversion";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameFrameConversionTest.class.getSimpleName() + "/";
+
+       private final static int  rows1 = 2245;
+       private final static int  cols1 = 745;
+       private final static int  cols2 = 1264;
+       private final static double sparsity1 = 0.9;
+       private final static double sparsity2 = 0.1;
+       private final static double eps=0.0000000001;
+
+        
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+       }
+
+       
+
+       @Test
+       public void testRowDoubleConversionSingleDense() {
+               testDataFrameConversion(ValueType.DOUBLE, true, true, false);
+       }
+       
+       @Test
+       public void testRowDoubleConversionSingleDenseUnknown() {
+               testDataFrameConversion(ValueType.DOUBLE, true, true, true);
+       }
+       
+       @Test
+       public void testRowDoubleConversionSingleSparse() {
+               testDataFrameConversion(ValueType.DOUBLE, true, false, false);
+       }
+       
+       @Test
+       public void testRowDoubleConversionSingleSparseUnknown() {
+               testDataFrameConversion(ValueType.DOUBLE, true, false, true);
+       }
+       
+       @Test
+       public void testRowDoubleConversionMultiDense() {
+               testDataFrameConversion(ValueType.DOUBLE, false, true, false);
+       }
+       
+       @Test
+       public void testRowDoubleConversionMultiDenseUnknown() {
+               testDataFrameConversion(ValueType.DOUBLE, false, true, true);
+       }
+       
+       @Test
+       public void testRowDoubleConversionMultiSparse() {
+               testDataFrameConversion(ValueType.DOUBLE, false, false, false);
+       }
+       
+       @Test
+       public void testRowDoubleConversionMultiSparseUnknown() {
+               testDataFrameConversion(ValueType.DOUBLE, false, false, true);
+       }
+
+       @Test
+       public void testRowStringConversionSingleDense() {
+               testDataFrameConversion(ValueType.STRING, true, true, false);
+       }
+       
+       @Test
+       public void testRowStringConversionSingleDenseUnknown() {
+               testDataFrameConversion(ValueType.STRING, true, true, true);
+       }
+       
+       @Test
+       public void testRowStringConversionSingleSparse() {
+               testDataFrameConversion(ValueType.STRING, true, false, false);
+       }
+       
+       @Test
+       public void testRowStringConversionSingleSparseUnknown() {
+               testDataFrameConversion(ValueType.STRING, true, false, true);
+       }
+       
+       @Test
+       public void testRowStringConversionMultiDense() {
+               testDataFrameConversion(ValueType.STRING, false, true, false);
+       }
+       
+       @Test
+       public void testRowStringConversionMultiDenseUnknown() {
+               testDataFrameConversion(ValueType.STRING, false, true, true);
+       }
+       
+       @Test
+       public void testRowStringConversionMultiSparse() {
+               testDataFrameConversion(ValueType.STRING, false, false, false);
+       }
+       
+       @Test
+       public void testRowStringConversionMultiSparseUnknown() {
+               testDataFrameConversion(ValueType.STRING, false, false, true);
+       }
+
+       @Test
+       public void testRowLongConversionSingleDense() {
+               testDataFrameConversion(ValueType.INT, true, true, false);
+       }
+       
+       @Test
+       public void testRowLongConversionSingleDenseUnknown() {
+               testDataFrameConversion(ValueType.INT, true, true, true);
+       }
+       
+       @Test
+       public void testRowLongConversionSingleSparse() {
+               testDataFrameConversion(ValueType.INT, true, false, false);
+       }
+       
+       @Test
+       public void testRowLongConversionSingleSparseUnknown() {
+               testDataFrameConversion(ValueType.INT, true, false, true);
+       }
+       
+       @Test
+       public void testRowLongConversionMultiDense() {
+               testDataFrameConversion(ValueType.INT, false, true, false);
+       }
+       
+       @Test
+       public void testRowLongConversionMultiDenseUnknown() {
+               testDataFrameConversion(ValueType.INT, false, true, true);
+       }
+       
+       @Test
+       public void testRowLongConversionMultiSparse() {
+               testDataFrameConversion(ValueType.INT, false, false, false);
+       }
+       
+       @Test
+       public void testRowLongConversionMultiSparseUnknown() {
+               testDataFrameConversion(ValueType.INT, false, false, true);
+       }
+       
+       /**
+        * 
+        * @param vector
+        * @param singleColBlock
+        * @param dense
+        * @param unknownDims
+        */
+       private void testDataFrameConversion(ValueType vt, boolean 
singleColBlock, boolean dense, boolean unknownDims) {
+               boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+               RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+               SparkExecutionContext sec = null;
+               
+               try
+               {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+                       DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+                       
+                       //generate input data and setup metadata
+                       int cols = singleColBlock ? cols1 : cols2;
+                       double sparsity = dense ? sparsity1 : sparsity2; 
+                       double[][] A = getRandomMatrix(rows1, cols, -10, 10, 
sparsity, 2373); 
+                       A = (vt == ValueType.INT) ? TestUtils.round(A) : A;
+                       MatrixBlock mbA = 
DataConverter.convertToMatrixBlock(A); 
+                       FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, 
vt);
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+                       MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+                       List<ValueType> schema = Collections.nCopies(cols, vt);
+                       
+                       //setup spark context
+                       sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
+                       JavaSparkContext sc = sec.getSparkContext();
+                       SQLContext sqlctx = new SQLContext(sc);
+                       
+                       //get binary block input rdd
+                       JavaPairRDD<Long,FrameBlock> in = 
SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
+                       
+                       //frame - dataframe - frame conversion
+                       DataFrame df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+                       JavaPairRDD<Long,FrameBlock> out = 
FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
+                       
+                       //get output frame block
+                       FrameBlock fbB = 
SparkExecutionContext.toFrameBlock(out, schema, rows1, cols);
+                       
+                       //compare frame blocks
+                       MatrixBlock mbB = 
DataConverter.convertToMatrixBlock(fbB); 
+                       double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+                       TestUtils.compareMatrices(A, B, rows1, cols, eps);
+               }
+               catch( Exception ex ) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       sec.close();
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+                       DMLScript.rtplatform = oldPlatform;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
new file mode 100644
index 0000000..e88a867
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.mlcontext;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.SQLContext;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory;
+import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+
+public class DataFrameMatrixConversionTest extends AutomatedTestBase 
+{
+       private final static String TEST_DIR = "functions/mlcontext/";
+       private final static String TEST_NAME = "DataFrameConversion";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
DataFrameMatrixConversionTest.class.getSimpleName() + "/";
+
+       private final static int  rows1 = 2245;
+       private final static int  cols1 = 745;
+       private final static int  cols2 = 1264;
+       private final static double sparsity1 = 0.9;
+       private final static double sparsity2 = 0.1;
+       private final static double eps=0.0000000001;
+
+        
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"}));
+       }
+       
+       @Test
+       public void testVectorConversionSingleDense() {
+               testDataFrameConversion(true, true, true, false);
+       }
+       
+       @Test
+       public void testVectorConversionSingleDenseUnknown() {
+               testDataFrameConversion(true, true, true, true);
+       }
+       
+       @Test
+       public void testVectorConversionSingleSparse() {
+               testDataFrameConversion(true, true, false, false);
+       }
+       
+       @Test
+       public void testVectorConversionSingleSparseUnknown() {
+               testDataFrameConversion(true, true, false, true);
+       }
+       
+       @Test
+       public void testVectorConversionMultiDense() {
+               testDataFrameConversion(true, false, true, false);
+       }
+       
+       @Test
+       public void testVectorConversionMultiDenseUnknown() {
+               testDataFrameConversion(true, false, true, true);
+       }
+       
+       @Test
+       public void testVectorConversionMultiSparse() {
+               testDataFrameConversion(true, false, false, false);
+       }
+       
+       @Test
+       public void testVectorConversionMultiSparseUnknown() {
+               testDataFrameConversion(true, false, false, true);
+       }
+
+       @Test
+       public void testRowConversionSingleDense() {
+               testDataFrameConversion(false, true, true, false);
+       }
+       
+       @Test
+       public void testRowConversionSingleDenseUnknown() {
+               testDataFrameConversion(false, true, true, true);
+       }
+       
+       @Test
+       public void testRowConversionSingleSparse() {
+               testDataFrameConversion(false, true, false, false);
+       }
+       
+       @Test
+       public void testRowConversionSingleSparseUnknown() {
+               testDataFrameConversion(false, true, false, true);
+       }
+       
+       @Test
+       public void testRowConversionMultiDense() {
+               testDataFrameConversion(false, false, true, false);
+       }
+       
+       @Test
+       public void testRowConversionMultiDenseUnknown() {
+               testDataFrameConversion(false, false, true, true);
+       }
+       
+       @Test
+       public void testRowConversionMultiSparse() {
+               testDataFrameConversion(false, false, false, false);
+       }
+       
+       @Test
+       public void testRowConversionMultiSparseUnknown() {
+               testDataFrameConversion(false, false, false, true);
+       }
+       
+       /**
+        * 
+        * @param vector
+        * @param singleColBlock
+        * @param dense
+        * @param unknownDims
+        */
+       private void testDataFrameConversion(boolean vector, boolean 
singleColBlock, boolean dense, boolean unknownDims) {
+               boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; 
+               RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform;
+
+               SparkExecutionContext sec = null;
+               
+               try
+               {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+                       DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK;
+                       
+                       //generate input data and setup metadata
+                       int cols = singleColBlock ? cols1 : cols2;
+                       double sparsity = dense ? sparsity1 : sparsity2; 
+                       double[][] A = getRandomMatrix(rows1, cols, -10, 10, 
sparsity, 2373); 
+                       MatrixBlock mbA = 
DataConverter.convertToMatrixBlock(A); 
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc1 = new 
MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros());
+                       MatrixCharacteristics mc2 = unknownDims ? new 
MatrixCharacteristics() : new MatrixCharacteristics(mc1);
+                       
+                       //setup spark context
+                       sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
+                       JavaSparkContext sc = sec.getSparkContext();
+                       SQLContext sqlctx = new SQLContext(sc);
+                       
+                       //get binary block input rdd
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
+                       
+                       //matrix - dataframe - matrix conversion
+                       DataFrame df = 
RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
+                       
+                       //get output matrix block
+                       MatrixBlock mbB = 
SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1);
+                       
+                       //compare matrix blocks
+                       double[][] B = DataConverter.convertToDoubleMatrix(mbB);
+                       TestUtils.compareMatrices(A, B, rows1, cols, eps);
+               }
+               catch( Exception ex ) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       sec.close();
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig;
+                       DMLScript.rtplatform = oldPlatform;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index d12f6f2..11f3f02 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -226,13 +226,13 @@ public class FrameTest extends AutomatedTestBase
                {
                        //Create DataFrame for input A 
                        SQLContext sqlContext = new SQLContext(sc);
-                       StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema);
-                       JavaRDD<Row> rowRDDA = 
FrameRDDConverterUtils.getRowRDD(jsc, input("A"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
+                       StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
+                       JavaRDD<Row> rowRDDA = 
FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
                        dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
                        
                        //Create DataFrame for input B 
-                       StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
-                       JavaRDD<Row> rowRDDB = 
FrameRDDConverterUtils.getRowRDD(jsc, input("B"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
+                       StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
+                       JavaRDD<Row> rowRDDB = 
FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
                        dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
                }
 
@@ -285,7 +285,7 @@ public class FrameTest extends AutomatedTestBase
                                //Convert back DataFrame to binary block for 
comparison using original binary to converted DF and back to binary 
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, cols, -1, -1, -1);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .dataFrameToBinaryBlock(jsc, 
df, mc, false)
+                                               .dataFrameToBinaryBlock(jsc, 
df, mc, bFromDataFrame)
                                                .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(output("AB"), 
LongWritable.class, FrameBlock.class, 
OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
                        }
@@ -306,7 +306,7 @@ public class FrameTest extends AutomatedTestBase
                                //Convert back DataFrame to binary block for 
comparison using original binary to converted DF and back to binary 
                                MatrixCharacteristics mc = new 
MatrixCharacteristics(cRows, cCols, -1, -1, -1);
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
-                                               .dataFrameToBinaryBlock(jsc, 
df, mc, false)
+                                               .dataFrameToBinaryBlock(jsc, 
df, mc, bFromDataFrame)
                                                .mapToPair(new 
LongFrameToLongWritableFrameFunction());
                                rddOut.saveAsHadoopFile(fName, 
LongWritable.class, FrameBlock.class, 
OutputInfo.BinaryBlockOutputInfo.outputFormatClass);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
index 98c8b10..deac382 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
@@ -46,6 +46,7 @@ import org.apache.sysml.api.mlcontext.MatrixMetadata;
 import org.apache.sysml.api.mlcontext.Script;
 import org.apache.sysml.parser.Expression.ValueType;
 import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import 
org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToRow;
 import org.junit.After;
@@ -230,9 +231,9 @@ public class MLContextFrameTest extends AutomatedTestBase {
 
                                // Create DataFrame
                                SQLContext sqlContext = new SQLContext(sc);
-                               StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA);
+                               StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA, false);
                                DataFrame dataFrameA = 
sqlContext.createDataFrame(javaRddRowA, dfSchemaA);
-                               StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
+                               StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false);
                                DataFrame dataFrameB = 
sqlContext.createDataFrame(javaRddRowB, dfSchemaB);
                                if (script_type == SCRIPT_TYPE.DML)
                                        script = 
dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, 
fmB).out("A")
@@ -368,31 +369,31 @@ public class MLContextFrameTest extends AutomatedTestBase 
{
 
                } else if (outputType == IO_TYPE.DATAFRAME) {
 
-                       DataFrame dataFrameA = mlResults.getDataFrame("A");
+                       DataFrame dataFrameA = 
mlResults.getDataFrame("A").drop(RDDConverterUtils.DF_ID_COLUMN);
                        List<Row> listAOut = dataFrameA.collectAsList();
 
                        Row row1 = listAOut.get(0);
-                       Assert.assertEquals("Mistmatch with expected value", 
"1", row1.getString(0));
-                       Assert.assertEquals("Mistmatch with expected value", 
"Str2", row1.getString(1));
-                       Assert.assertEquals("Mistmatch with expected value", 
"3.0", row1.getString(2));
-                       Assert.assertEquals("Mistmatch with expected value", 
"true", row1.getString(3));
-
+                       Assert.assertEquals("Mistmatch with expected value", 
"1", row1.get(0).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"Str2", row1.get(1).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"3.0", row1.get(2).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"true", row1.get(3).toString());
+                       
                        Row row2 = listAOut.get(1);
-                       Assert.assertEquals("Mistmatch with expected value", 
"4", row2.getString(0));
-                       Assert.assertEquals("Mistmatch with expected value", 
"Str12", row2.getString(1));
-                       Assert.assertEquals("Mistmatch with expected value", 
"13.0", row2.getString(2));
-                       Assert.assertEquals("Mistmatch with expected value", 
"true", row2.getString(3));
+                       Assert.assertEquals("Mistmatch with expected value", 
"4", row2.get(0).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"Str12", row2.get(1).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"13.0", row2.get(2).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"true", row2.get(3).toString());
 
-                       DataFrame dataFrameC = mlResults.getDataFrame("C");
+                       DataFrame dataFrameC = 
mlResults.getDataFrame("C").drop(RDDConverterUtils.DF_ID_COLUMN);
                        List<Row> listCOut = dataFrameC.collectAsList();
 
                        Row row3 = listCOut.get(0);
-                       Assert.assertEquals("Mistmatch with expected value", 
"Str12", row3.getString(0));
-                       Assert.assertEquals("Mistmatch with expected value", 
"13.0", row3.getString(1));
+                       Assert.assertEquals("Mistmatch with expected value", 
"Str12", row3.get(0).toString());
+                       Assert.assertEquals("Mistmatch with expected value", 
"13.0", row3.get(1).toString());
 
                        Row row4 = listCOut.get(1);
-                       Assert.assertEquals("Mistmatch with expected value", 
"Str25", row4.getString(0));
-                       Assert.assertEquals("Mistmatch with expected value", 
"26.0", row4.getString(1));
+                       Assert.assertEquals("Mistmatch with expected value", 
"Str25", row4.get(0));
+                       Assert.assertEquals("Mistmatch with expected value", 
"26.0", row4.get(1));
                } else {
                        String[][] frameA = 
mlResults.getFrameAs2DStringArray("A");
                        Assert.assertEquals("Str2", frameA[0][1]);

Reply via email to