[SYSTEMML-914] Rework dataframe-frame, frame-dataframe converters Similar to the rework of dataframe-matrix converters, this patch fixes various correctness and performance issues of the dataframe-frame converters. This includes consistent rowID column handling, consistent APIs, a fix of dimension analysis with existing ID column, exploitation of existing row IDs, avoided unnecessary dimension analysis on unknown nnzs, efficient schema handling, and a more efficient parsing of frame inputs by exploiting matching value types.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/df090f2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/df090f2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/df090f2b Branch: refs/heads/master Commit: df090f2b1f9dd23a6fd48c5a67a95e1eb8e3ba59 Parents: ed07284 Author: Matthias Boehm <mbo...@us.ibm.com> Authored: Fri Sep 16 01:35:54 2016 +0200 Committer: Matthias Boehm <mbo...@us.ibm.com> Committed: Fri Sep 16 06:33:32 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/MLOutput.java | 19 +- .../api/mlcontext/MLContextConversionUtil.java | 5 +- .../spark/utils/FrameRDDConverterUtils.java | 218 +++++++++------- .../spark/utils/RDDConverterUtils.java | 6 +- .../sysml/runtime/util/UtilFunctions.java | 7 +- .../functions/frame/FrameConverterTest.java | 13 +- .../mlcontext/DataFrameConversionTest.java | 196 --------------- .../mlcontext/DataFrameFrameConversionTest.java | 246 +++++++++++++++++++ .../DataFrameMatrixConversionTest.java | 196 +++++++++++++++ .../functions/mlcontext/FrameTest.java | 12 +- .../mlcontext/MLContextFrameTest.java | 35 +-- 11 files changed, 623 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java index ec2f24d..d011104 100644 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ b/src/main/java/org/apache/sysml/api/MLOutput.java @@ -47,18 +47,17 @@ public class MLOutput { Map<String, JavaPairRDD<?,?>> _outputs; private Map<String, MatrixCharacteristics> _outMetadata = null; + public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) { + this._outputs = outputs; + this._outMetadata = outMetadata; + } + public MatrixBlock getMatrixBlock(String varName) throws DMLRuntimeException { MatrixCharacteristics mc = getMatrixCharacteristics(varName); // The matrix block is always pushed to an RDD and then we do collect // We can later avoid this by returning symbol table rather than "Map<String, JavaPairRDD<MatrixIndexes,MatrixBlock>> _outputs" - MatrixBlock mb = SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(), + return SparkExecutionContext.toMatrixBlock(getBinaryBlockedRDD(varName), (int) mc.getRows(), (int) mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros()); - return mb; - } - - public MLOutput(Map<String, JavaPairRDD<?,?>> outputs, Map<String, MatrixCharacteristics> outMetadata) { - this._outputs = outputs; - this._outMetadata = outMetadata; } @SuppressWarnings("unchecked") @@ -160,6 +159,8 @@ public class MLOutput { } public JavaRDD<String> getStringFrameRDD(String varName, String format, CSVFileFormatProperties fprop ) throws DMLRuntimeException { + //TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and + //hence we will never find a frame binary block in the outputs. JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); if(format.equals("csv")) { @@ -175,9 +176,11 @@ public class MLOutput { } public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException { + //TODO MB: note that on construction of MLOutput only matrix binary blocks are passed, and + //hence we will never find a frame binary block in the outputs. JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); - return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryRDD, mcIn, jsc); + return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null); } public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java index 5476902..e74dc53 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java @@ -1217,11 +1217,10 @@ public class MLContextConversionUtil { @SuppressWarnings("unchecked") JavaPairRDD<Long, FrameBlock> binaryBlockFrame = (JavaPairRDD<Long, FrameBlock>) sparkExecutionContext .getRDDHandleForFrameObject(frameObject, InputInfo.BinaryBlockInputInfo); - MatrixCharacteristics matrixCharacteristics = frameObject.getMatrixCharacteristics(); + MatrixCharacteristics mc = frameObject.getMatrixCharacteristics(); JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI()); - - return FrameRDDConverterUtils.binaryBlockToDataFrame(binaryBlockFrame, matrixCharacteristics, jsc); + return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryBlockFrame, mc, frameObject.getSchema()); } catch (DMLRuntimeException e) { throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 211f814..faf8ba1 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -42,6 +42,7 @@ import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -54,6 +55,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable; import org.apache.sysml.runtime.instructions.spark.data.SerText; import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines; +import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; @@ -326,27 +328,29 @@ public class FrameRDDConverterUtils * @throws DMLRuntimeException */ public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc, - DataFrame df, MatrixCharacteristics mcOut, boolean containsID) + DataFrame df, MatrixCharacteristics mc, boolean containsID) throws DMLRuntimeException { - - if(containsID) - df = df.drop(RDDConverterUtils.DF_ID_COLUMN); - //determine unknown dimensions if required - if( !mcOut.dimsKnown(true) ) { + if( !mc.dimsKnown() ) { //nnz are irrelevant here JavaRDD<Row> tmp = df.javaRDD(); long rlen = tmp.count(); - long clen = containsID ? (df.columns().length - 1) : df.columns().length; - mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); + long clen = df.columns().length - (containsID?1:0); + mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1); } - JavaPairRDD<Row, Long> prepinput = df.javaRDD() - .zipWithIndex(); //zip row index - + JavaPairRDD<Row, Long> prepinput = containsID ? + df.javaRDD().mapToPair(new DataFrameExtractIDFunction()) : + df.javaRDD().zipWithIndex(); //zip row index + + //convert data frame to frame schema (prepare once) + List<String> colnames = new ArrayList<String>(); + List<ValueType> fschema = new ArrayList<ValueType>(); + convertDFSchemaToFrameSchema(df.schema(), colnames, fschema, containsID); + //convert rdd to binary block rdd - JavaPairRDD<Long, FrameBlock> out = prepinput - .mapPartitionsToPair(new DataFrameToBinaryBlockFunction(mcOut)); + JavaPairRDD<Long, FrameBlock> out = prepinput.mapPartitionsToPair( + new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID)); return out; } @@ -359,22 +363,27 @@ public class FrameRDDConverterUtils * @param strict * @return */ - public static DataFrame binaryBlockToDataFrame(JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mcIn, JavaSparkContext sc) + public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in, + MatrixCharacteristics mc, List<ValueType> schema) { - List<ValueType> schema = in.first()._2().getSchema(); + if( !mc.colsKnown() ) + throw new RuntimeException("Number of columns needed to convert binary block to data frame."); - //convert binary block to rows rdd (from blocks/rows) - JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction()); + //convert binary block to rows rdd + JavaRDD<Row> rowRDD = in.flatMap( + new BinaryBlockToDataFrameFunction()); - SQLContext sqlContext = new SQLContext(sc); - StructType dfSchema = convertFrameSchemaToDFSchema(schema); - DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); + //create data frame schema + if( schema == null ) + schema = Collections.nCopies((int)mc.getCols(), ValueType.STRING); + StructType dfSchema = convertFrameSchemaToDFSchema(schema, true); - return df; + //rdd to data frame conversion + return sqlctx.createDataFrame(rowRDD, dfSchema); } - /* + /** * This function will convert Frame schema into DataFrame schema * * @param schema @@ -382,32 +391,64 @@ public class FrameRDDConverterUtils * @return * Returns the DataFrame schema (StructType) */ - public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema) + public static StructType convertFrameSchemaToDFSchema(List<ValueType> fschema, boolean containsID) { - // Generate the schema based on the string of schema + // generate the schema based on the string of schema List<StructField> fields = new ArrayList<StructField>(); - int i = 1; - for (ValueType schema : lschema) { - org.apache.spark.sql.types.DataType dataType = DataTypes.StringType; + // add id column type + if( containsID ) + fields.add(DataTypes.createStructField(RDDConverterUtils.DF_ID_COLUMN, + DataTypes.DoubleType, true)); + + // add remaining types + int col = 1; + for (ValueType schema : fschema) { + DataType dt = null; switch(schema) { - case STRING: dataType = DataTypes.StringType; break; - case DOUBLE: dataType = DataTypes.DoubleType; break; - case INT: dataType = DataTypes.LongType; break; - case BOOLEAN: dataType = DataTypes.BooleanType; break; - default: + case STRING: dt = DataTypes.StringType; break; + case DOUBLE: dt = DataTypes.DoubleType; break; + case INT: dt = DataTypes.LongType; break; + case BOOLEAN: dt = DataTypes.BooleanType; break; + default: dt = DataTypes.StringType; LOG.warn("Using default type String for " + schema.toString()); } - fields.add(DataTypes.createStructField("C"+i++, dataType, true)); + fields.add(DataTypes.createStructField("C"+col++, dt, true)); } return DataTypes.createStructType(fields); } + /** + * + * @param dfschema + * @param containsID + * @return + */ + public static void convertDFSchemaToFrameSchema(StructType dfschema, List<String> colnames, + List<ValueType> fschema, boolean containsID) + { + int off = containsID ? 1 : 0; + for( int i=off; i<dfschema.fields().length; i++ ) { + StructField structType = dfschema.apply(i); + colnames.add(structType.name()); + if(structType.dataType() == DataTypes.DoubleType + || structType.dataType() == DataTypes.FloatType) + fschema.add(ValueType.DOUBLE); + else if(structType.dataType() == DataTypes.LongType + || structType.dataType() == DataTypes.IntegerType) + fschema.add(ValueType.INT); + else if(structType.dataType() == DataTypes.BooleanType) + fschema.add(ValueType.BOOLEAN); + else + fschema.add(ValueType.STRING); + } + } + /* * It will return JavaRDD<Row> based on csv data input file. */ - public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema) + public static JavaRDD<Row> csvToRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema) { // Load a text file and convert each line to a java rdd. JavaRDD<String> dataRdd = sc.textFile(fnameIn); @@ -695,20 +736,29 @@ public class FrameRDDConverterUtils private static final long serialVersionUID = 2269315691094111843L; private long _clen = -1; + private List<String> _colnames = null; + private List<ValueType> _schema = null; + private boolean _containsID = false; private int _maxRowsPerBlock = -1; - public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc) { + public DataFrameToBinaryBlockFunction(MatrixCharacteristics mc, List<String> colnames, + List<ValueType> schema, boolean containsID) { _clen = mc.getCols(); + _colnames = colnames; + _schema = schema; + _containsID = containsID; _maxRowsPerBlock = Math.max((int) (FrameBlock.BUFFER_SIZE/_clen), 1); } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception { + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) + throws Exception + { ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); - Long[] ix = new Long[1]; - FrameBlock[] mb = new FrameBlock[1]; - int iRowsInBlock = 0; + long ix = -1; + FrameBlock fb = null; + Object[] tmprow = new Object[(int)_clen]; while( arg0.hasNext() ) { @@ -716,55 +766,40 @@ public class FrameRDDConverterUtils Row row = tmp._1(); long rowix = tmp._2()+1; - if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) { - if( iRowsInBlock == _maxRowsPerBlock ) - flushBlocksToList(ix, mb, ret); - createBlocks(rowix, ix, mb, row); - iRowsInBlock = 0; + if( fb == null || fb.getNumRows() == _maxRowsPerBlock) { + if( fb != null ) + flushBlocksToList(ix, fb, ret); + ix = rowix; + fb = new FrameBlock(_schema, _colnames); } //process row data - Object[] parts = rowToObjectArray(row, (int)_clen, mb[0].getSchema()); - mb[0].appendRow(parts); - iRowsInBlock++; + int off = _containsID ? 1 : 0; + for(int i=off; i<row.size(); i++) { + tmprow[i-off] = UtilFunctions.objectToObject( + _schema.get(i-off), row.get(i)); + } + fb.appendRow(tmprow); } //flush last blocks - flushBlocksToList(ix, mb, ret); + flushBlocksToList(ix, fb, ret); return ret; } - public Object[] rowToObjectArray(Row row, int _clen, List<ValueType> schema) throws Exception { - Object[] ret = new Object[_clen]; - for(int i = 0; i < row.length(); i++) - ret[i] = UtilFunctions.objectToObject(schema.get(i), row.get(i)); - for(int i=row.length(); i<_clen; i++) - ret[i] = ""; - return ret; - } - - // Creates new state of empty column blocks for current global row index. - private void createBlocks(long rowix, Long[] ix, FrameBlock[] mb, Row row) - { - //compute row block index and number of column blocks - ix[0] = new Long(rowix); - - List<String> columns = new ArrayList<String>(); - List<ValueType> schema = new ArrayList<ValueType>(); - for (StructField structType: row.schema().fields()) { - columns.add(structType.name()); - if(structType.dataType() == DataTypes.DoubleType || structType.dataType() == DataTypes.FloatType) - schema.add(ValueType.DOUBLE); - else if(structType.dataType() == DataTypes.LongType || structType.dataType() == DataTypes.IntegerType) - schema.add(ValueType.INT); - else if(structType.dataType() == DataTypes.BooleanType) - schema.add(ValueType.BOOLEAN); - else - schema.add(ValueType.STRING); - } - mb[0] = new FrameBlock(schema); - mb[0].setColumnNames(columns); + /** + * + * @param ix + * @param fb + * @param ret + * @throws DMLRuntimeException + */ + private static void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) + throws DMLRuntimeException + { + if( fb != null && fb.getNumRows()>0 ) + ret.add(new Tuple2<Long,FrameBlock>(ix, fb)); } } @@ -779,14 +814,21 @@ public class FrameRDDConverterUtils public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0) throws Exception { + long rowIndex = arg0._1(); FrameBlock blk = arg0._2(); ArrayList<Row> ret = new ArrayList<Row>(); //handle Frame block data - Iterator<Object[]> iter = blk.getObjectRowIterator(); - while( iter.hasNext() ) - ret.add(RowFactory.create(iter.next().clone())); - + int rows = blk.getNumRows(); + int cols = blk.getNumColumns(); + for( int i=0; i<rows; i++ ) { + Object[] row = new Object[cols+1]; + row[0] = rowIndex++; + for( int j=0; j<cols; j++ ) + row[j+1] = blk.get(i, j); + ret.add(RowFactory.create(row)); + } + return ret; } } @@ -1046,13 +1088,11 @@ public class FrameRDDConverterUtils // Common functions // Flushes current state of filled column blocks to output list. - private static void flushBlocksToList( Long[] ix, FrameBlock[] mb, ArrayList<Tuple2<Long,FrameBlock>> ret ) + private static void flushBlocksToList( Long[] ix, FrameBlock[] fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) throws DMLRuntimeException - { - int len = ix.length; - for( int i=0; i<len; i++ ) - if( mb[i] != null ) { - ret.add(new Tuple2<Long,FrameBlock>(ix[i],mb[i])); - } + { + for( int i=0; i<ix.length; i++ ) + if( fb[i] != null && fb[0].getNumRows()>0 ) + ret.add(new Tuple2<Long,FrameBlock>(ix[i], fb[i])); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index ba1934a..3ee1ef8 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -322,7 +322,7 @@ public class RDDConverterUtils * @return * @throws DMLRuntimeException */ - public static DataFrame binaryBlockToDataFrame(SQLContext sqlContext, + public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector) { if( !mc.colsKnown() ) @@ -344,7 +344,7 @@ public class RDDConverterUtils } //rdd to data frame conversion - return sqlContext.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields)); + return sqlctx.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields)); } /** @@ -1011,7 +1011,7 @@ public class RDDConverterUtils /** * */ - private static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long> + protected static class DataFrameExtractIDFunction implements PairFunction<Row, Row,Long> { private static final long serialVersionUID = 7438855241666363433L; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 1ac552f..81a1e8f 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -423,8 +423,11 @@ public class UtilFunctions * @return */ public static Object objectToObject(ValueType vt, Object in ) { - String str = objectToString(in); - return stringToObject(vt, str ); + if( in instanceof Double && vt == ValueType.DOUBLE + || in instanceof Long && vt == ValueType.INT ) + return in; //quick path to avoid double parsing + else + return stringToObject(vt, objectToString(in) ); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index 107dee3..f0c17eb 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase //Create DataFrame SQLContext sqlContext = new SQLContext(sc); - StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema); - JavaRDD<Row> rowRDD = FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema); + StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false); + JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, schema); DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils @@ -532,13 +532,14 @@ public class FrameConverterTest extends AutomatedTestBase case BIN2DFRM: { InputInfo iinfo = InputInfo.BinaryBlockInputInfo; OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; - JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); - JavaPairRDD<Long, FrameBlock> rddIn2 = rddIn.mapToPair(new LongWritableFrameToLongFrameFunction()); - DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(rddIn2, mc, sc); + JavaPairRDD<Long, FrameBlock> rddIn = sc + .hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class) + .mapToPair(new LongWritableFrameToLongFrameFunction()); + DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, schema); //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/) + .dataFrameToBinaryBlock(sc, df, mc, true) .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java deleted file mode 100644 index c19865c..0000000 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameConversionTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.test.integration.functions.mlcontext; - -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; -import org.junit.Test; -import org.apache.sysml.api.DMLScript; -import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; -import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; -import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; -import org.apache.sysml.runtime.util.DataConverter; -import org.apache.sysml.test.integration.AutomatedTestBase; -import org.apache.sysml.test.integration.TestConfiguration; -import org.apache.sysml.test.utils.TestUtils; - - -public class DataFrameConversionTest extends AutomatedTestBase -{ - private final static String TEST_DIR = "functions/mlcontext/"; - private final static String TEST_NAME = "DataFrameConversion"; - private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameConversionTest.class.getSimpleName() + "/"; - - private final static int rows1 = 2245; - private final static int cols1 = 745; - private final static int cols2 = 1264; - private final static double sparsity1 = 0.9; - private final static double sparsity2 = 0.1; - private final static double eps=0.0000000001; - - - @Override - public void setUp() { - addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"})); - } - - @Test - public void testVectorConversionSingleDense() { - testDataFrameConversion(true, true, true, false); - } - - @Test - public void testVectorConversionSingleDenseUnknown() { - testDataFrameConversion(true, true, true, true); - } - - @Test - public void testVectorConversionSingleSparse() { - testDataFrameConversion(true, true, false, false); - } - - @Test - public void testVectorConversionSingleSparseUnknown() { - testDataFrameConversion(true, true, false, true); - } - - @Test - public void testVectorConversionMultiDense() { - testDataFrameConversion(true, false, true, false); - } - - @Test - public void testVectorConversionMultiDenseUnknown() { - testDataFrameConversion(true, false, true, true); - } - - @Test - public void testVectorConversionMultiSparse() { - testDataFrameConversion(true, false, false, false); - } - - @Test - public void testVectorConversionMultiSparseUnknown() { - testDataFrameConversion(true, false, false, true); - } - - @Test - public void testRowConversionSingleDense() { - testDataFrameConversion(false, true, true, false); - } - - @Test - public void testRowConversionSingleDenseUnknown() { - testDataFrameConversion(false, true, true, true); - } - - @Test - public void testRowConversionSingleSparse() { - testDataFrameConversion(false, true, false, false); - } - - @Test - public void testRowConversionSingleSparseUnknown() { - testDataFrameConversion(false, true, false, true); - } - - @Test - public void testRowConversionMultiDense() { - testDataFrameConversion(false, false, true, false); - } - - @Test - public void testRowConversionMultiDenseUnknown() { - testDataFrameConversion(false, false, true, true); - } - - @Test - public void testRowConversionMultiSparse() { - testDataFrameConversion(false, false, false, false); - } - - @Test - public void testRowConversionMultiSparseUnknown() { - testDataFrameConversion(false, false, false, true); - } - - /** - * - * @param vector - * @param singleColBlock - * @param dense - * @param unknownDims - */ - private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) { - boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; - RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform; - - SparkExecutionContext sec = null; - - try - { - DMLScript.USE_LOCAL_SPARK_CONFIG = true; - DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; - - //generate input data and setup metadata - int cols = singleColBlock ? cols1 : cols2; - double sparsity = dense ? sparsity1 : sparsity2; - double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); - MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); - int blksz = ConfigurationManager.getBlocksize(); - MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros()); - MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1); - - //setup spark context - sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); - JavaSparkContext sc = sec.getSparkContext(); - SQLContext sqlctx = new SQLContext(sc); - - //get binary block input rdd - JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz); - - //matrix - dataframe - matrix conversion - DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector); - JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector); - - //get output matrix block - MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1); - - //compare matrix blocks - double[][] B = DataConverter.convertToDoubleMatrix(mbB); - TestUtils.compareMatrices(A, B, rows1, cols, eps); - } - catch( Exception ex ) { - throw new RuntimeException(ex); - } - finally { - sec.close(); - DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig; - DMLScript.rtplatform = oldPlatform; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java new file mode 100644 index 0000000..a26cfe8 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameFrameConversionTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.mlcontext; + +import java.util.Collections; +import java.util.List; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + + +public class DataFrameFrameConversionTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/mlcontext/"; + private final static String TEST_NAME = "DataFrameConversion"; + private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameFrameConversionTest.class.getSimpleName() + "/"; + + private final static int rows1 = 2245; + private final static int cols1 = 745; + private final static int cols2 = 1264; + private final static double sparsity1 = 0.9; + private final static double sparsity2 = 0.1; + private final static double eps=0.0000000001; + + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"})); + } + + + + @Test + public void testRowDoubleConversionSingleDense() { + testDataFrameConversion(ValueType.DOUBLE, true, true, false); + } + + @Test + public void testRowDoubleConversionSingleDenseUnknown() { + testDataFrameConversion(ValueType.DOUBLE, true, true, true); + } + + @Test + public void testRowDoubleConversionSingleSparse() { + testDataFrameConversion(ValueType.DOUBLE, true, false, false); + } + + @Test + public void testRowDoubleConversionSingleSparseUnknown() { + testDataFrameConversion(ValueType.DOUBLE, true, false, true); + } + + @Test + public void testRowDoubleConversionMultiDense() { + testDataFrameConversion(ValueType.DOUBLE, false, true, false); + } + + @Test + public void testRowDoubleConversionMultiDenseUnknown() { + testDataFrameConversion(ValueType.DOUBLE, false, true, true); + } + + @Test + public void testRowDoubleConversionMultiSparse() { + testDataFrameConversion(ValueType.DOUBLE, false, false, false); + } + + @Test + public void testRowDoubleConversionMultiSparseUnknown() { + testDataFrameConversion(ValueType.DOUBLE, false, false, true); + } + + @Test + public void testRowStringConversionSingleDense() { + testDataFrameConversion(ValueType.STRING, true, true, false); + } + + @Test + public void testRowStringConversionSingleDenseUnknown() { + testDataFrameConversion(ValueType.STRING, true, true, true); + } + + @Test + public void testRowStringConversionSingleSparse() { + testDataFrameConversion(ValueType.STRING, true, false, false); + } + + @Test + public void testRowStringConversionSingleSparseUnknown() { + testDataFrameConversion(ValueType.STRING, true, false, true); + } + + @Test + public void testRowStringConversionMultiDense() { + testDataFrameConversion(ValueType.STRING, false, true, false); + } + + @Test + public void testRowStringConversionMultiDenseUnknown() { + testDataFrameConversion(ValueType.STRING, false, true, true); + } + + @Test + public void testRowStringConversionMultiSparse() { + testDataFrameConversion(ValueType.STRING, false, false, false); + } + + @Test + public void testRowStringConversionMultiSparseUnknown() { + testDataFrameConversion(ValueType.STRING, false, false, true); + } + + @Test + public void testRowLongConversionSingleDense() { + testDataFrameConversion(ValueType.INT, true, true, false); + } + + @Test + public void testRowLongConversionSingleDenseUnknown() { + testDataFrameConversion(ValueType.INT, true, true, true); + } + + @Test + public void testRowLongConversionSingleSparse() { + testDataFrameConversion(ValueType.INT, true, false, false); + } + + @Test + public void testRowLongConversionSingleSparseUnknown() { + testDataFrameConversion(ValueType.INT, true, false, true); + } + + @Test + public void testRowLongConversionMultiDense() { + testDataFrameConversion(ValueType.INT, false, true, false); + } + + @Test + public void testRowLongConversionMultiDenseUnknown() { + testDataFrameConversion(ValueType.INT, false, true, true); + } + + @Test + public void testRowLongConversionMultiSparse() { + testDataFrameConversion(ValueType.INT, false, false, false); + } + + @Test + public void testRowLongConversionMultiSparseUnknown() { + testDataFrameConversion(ValueType.INT, false, false, true); + } + + /** + * + * @param vector + * @param singleColBlock + * @param dense + * @param unknownDims + */ + private void testDataFrameConversion(ValueType vt, boolean singleColBlock, boolean dense, boolean unknownDims) { + boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; + RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform; + + SparkExecutionContext sec = null; + + try + { + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; + + //generate input data and setup metadata + int cols = singleColBlock ? cols1 : cols2; + double sparsity = dense ? sparsity1 : sparsity2; + double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); + A = (vt == ValueType.INT) ? TestUtils.round(A) : A; + MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); + FrameBlock fbA = DataConverter.convertToFrameBlock(mbA, vt); + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros()); + MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1); + List<ValueType> schema = Collections.nCopies(cols, vt); + + //setup spark context + sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); + JavaSparkContext sc = sec.getSparkContext(); + SQLContext sqlctx = new SQLContext(sc); + + //get binary block input rdd + JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA); + + //frame - dataframe - frame conversion + DataFrame df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema); + JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true); + + //get output frame block + FrameBlock fbB = SparkExecutionContext.toFrameBlock(out, schema, rows1, cols); + + //compare frame blocks + MatrixBlock mbB = DataConverter.convertToMatrixBlock(fbB); + double[][] B = DataConverter.convertToDoubleMatrix(mbB); + TestUtils.compareMatrices(A, B, rows1, cols, eps); + } + catch( Exception ex ) { + throw new RuntimeException(ex); + } + finally { + sec.close(); + DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig; + DMLScript.rtplatform = oldPlatform; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java new file mode 100644 index 0000000..e88a867 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.mlcontext; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; +import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + + +public class DataFrameMatrixConversionTest extends AutomatedTestBase +{ + private final static String TEST_DIR = "functions/mlcontext/"; + private final static String TEST_NAME = "DataFrameConversion"; + private final static String TEST_CLASS_DIR = TEST_DIR + DataFrameMatrixConversionTest.class.getSimpleName() + "/"; + + private final static int rows1 = 2245; + private final static int cols1 = 745; + private final static int cols2 = 1264; + private final static double sparsity1 = 0.9; + private final static double sparsity2 = 0.1; + private final static double eps=0.0000000001; + + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"A", "B"})); + } + + @Test + public void testVectorConversionSingleDense() { + testDataFrameConversion(true, true, true, false); + } + + @Test + public void testVectorConversionSingleDenseUnknown() { + testDataFrameConversion(true, true, true, true); + } + + @Test + public void testVectorConversionSingleSparse() { + testDataFrameConversion(true, true, false, false); + } + + @Test + public void testVectorConversionSingleSparseUnknown() { + testDataFrameConversion(true, true, false, true); + } + + @Test + public void testVectorConversionMultiDense() { + testDataFrameConversion(true, false, true, false); + } + + @Test + public void testVectorConversionMultiDenseUnknown() { + testDataFrameConversion(true, false, true, true); + } + + @Test + public void testVectorConversionMultiSparse() { + testDataFrameConversion(true, false, false, false); + } + + @Test + public void testVectorConversionMultiSparseUnknown() { + testDataFrameConversion(true, false, false, true); + } + + @Test + public void testRowConversionSingleDense() { + testDataFrameConversion(false, true, true, false); + } + + @Test + public void testRowConversionSingleDenseUnknown() { + testDataFrameConversion(false, true, true, true); + } + + @Test + public void testRowConversionSingleSparse() { + testDataFrameConversion(false, true, false, false); + } + + @Test + public void testRowConversionSingleSparseUnknown() { + testDataFrameConversion(false, true, false, true); + } + + @Test + public void testRowConversionMultiDense() { + testDataFrameConversion(false, false, true, false); + } + + @Test + public void testRowConversionMultiDenseUnknown() { + testDataFrameConversion(false, false, true, true); + } + + @Test + public void testRowConversionMultiSparse() { + testDataFrameConversion(false, false, false, false); + } + + @Test + public void testRowConversionMultiSparseUnknown() { + testDataFrameConversion(false, false, false, true); + } + + /** + * + * @param vector + * @param singleColBlock + * @param dense + * @param unknownDims + */ + private void testDataFrameConversion(boolean vector, boolean singleColBlock, boolean dense, boolean unknownDims) { + boolean oldConfig = DMLScript.USE_LOCAL_SPARK_CONFIG; + RUNTIME_PLATFORM oldPlatform = DMLScript.rtplatform; + + SparkExecutionContext sec = null; + + try + { + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + DMLScript.rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; + + //generate input data and setup metadata + int cols = singleColBlock ? cols1 : cols2; + double sparsity = dense ? sparsity1 : sparsity2; + double[][] A = getRandomMatrix(rows1, cols, -10, 10, sparsity, 2373); + MatrixBlock mbA = DataConverter.convertToMatrixBlock(A); + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc1 = new MatrixCharacteristics(rows1, cols, blksz, blksz, mbA.getNonZeros()); + MatrixCharacteristics mc2 = unknownDims ? new MatrixCharacteristics() : new MatrixCharacteristics(mc1); + + //setup spark context + sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); + JavaSparkContext sc = sec.getSparkContext(); + SQLContext sqlctx = new SQLContext(sc); + + //get binary block input rdd + JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz); + + //matrix - dataframe - matrix conversion + DataFrame df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector); + JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector); + + //get output matrix block + MatrixBlock mbB = SparkExecutionContext.toMatrixBlock(out, rows1, cols, blksz, blksz, -1); + + //compare matrix blocks + double[][] B = DataConverter.convertToDoubleMatrix(mbB); + TestUtils.compareMatrices(A, B, rows1, cols, eps); + } + catch( Exception ex ) { + throw new RuntimeException(ex); + } + finally { + sec.close(); + DMLScript.USE_LOCAL_SPARK_CONFIG = oldConfig; + DMLScript.rtplatform = oldPlatform; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java index d12f6f2..11f3f02 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java @@ -226,13 +226,13 @@ public class FrameTest extends AutomatedTestBase { //Create DataFrame for input A SQLContext sqlContext = new SQLContext(sc); - StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema); - JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema); + StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false); + JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema); dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA); //Create DataFrame for input B - StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB); - JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB); + StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false); + JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB); dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB); } @@ -285,7 +285,7 @@ public class FrameTest extends AutomatedTestBase //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary MatrixCharacteristics mc = new MatrixCharacteristics(rows, cols, -1, -1, -1); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .dataFrameToBinaryBlock(jsc, df, mc, false) + .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame) .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(output("AB"), LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass); } @@ -306,7 +306,7 @@ public class FrameTest extends AutomatedTestBase //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary MatrixCharacteristics mc = new MatrixCharacteristics(cRows, cCols, -1, -1, -1); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils - .dataFrameToBinaryBlock(jsc, df, mc, false) + .dataFrameToBinaryBlock(jsc, df, mc, bFromDataFrame) .mapToPair(new LongFrameToLongWritableFrameFunction()); rddOut.saveAsHadoopFile(fName, LongWritable.class, FrameBlock.class, OutputInfo.BinaryBlockOutputInfo.outputFormatClass); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/df090f2b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java index 98c8b10..deac382 100644 --- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java @@ -46,6 +46,7 @@ import org.apache.sysml.api.mlcontext.MatrixMetadata; import org.apache.sysml.api.mlcontext.Script; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; +import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToRow; import org.junit.After; @@ -230,9 +231,9 @@ public class MLContextFrameTest extends AutomatedTestBase { // Create DataFrame SQLContext sqlContext = new SQLContext(sc); - StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA); + StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaA, false); DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, dfSchemaA); - StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB); + StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB, false); DataFrame dataFrameB = sqlContext.createDataFrame(javaRddRowB, dfSchemaB); if (script_type == SCRIPT_TYPE.DML) script = dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, fmB).out("A") @@ -368,31 +369,31 @@ public class MLContextFrameTest extends AutomatedTestBase { } else if (outputType == IO_TYPE.DATAFRAME) { - DataFrame dataFrameA = mlResults.getDataFrame("A"); + DataFrame dataFrameA = mlResults.getDataFrame("A").drop(RDDConverterUtils.DF_ID_COLUMN); List<Row> listAOut = dataFrameA.collectAsList(); Row row1 = listAOut.get(0); - Assert.assertEquals("Mistmatch with expected value", "1", row1.getString(0)); - Assert.assertEquals("Mistmatch with expected value", "Str2", row1.getString(1)); - Assert.assertEquals("Mistmatch with expected value", "3.0", row1.getString(2)); - Assert.assertEquals("Mistmatch with expected value", "true", row1.getString(3)); - + Assert.assertEquals("Mistmatch with expected value", "1", row1.get(0).toString()); + Assert.assertEquals("Mistmatch with expected value", "Str2", row1.get(1).toString()); + Assert.assertEquals("Mistmatch with expected value", "3.0", row1.get(2).toString()); + Assert.assertEquals("Mistmatch with expected value", "true", row1.get(3).toString()); + Row row2 = listAOut.get(1); - Assert.assertEquals("Mistmatch with expected value", "4", row2.getString(0)); - Assert.assertEquals("Mistmatch with expected value", "Str12", row2.getString(1)); - Assert.assertEquals("Mistmatch with expected value", "13.0", row2.getString(2)); - Assert.assertEquals("Mistmatch with expected value", "true", row2.getString(3)); + Assert.assertEquals("Mistmatch with expected value", "4", row2.get(0).toString()); + Assert.assertEquals("Mistmatch with expected value", "Str12", row2.get(1).toString()); + Assert.assertEquals("Mistmatch with expected value", "13.0", row2.get(2).toString()); + Assert.assertEquals("Mistmatch with expected value", "true", row2.get(3).toString()); - DataFrame dataFrameC = mlResults.getDataFrame("C"); + DataFrame dataFrameC = mlResults.getDataFrame("C").drop(RDDConverterUtils.DF_ID_COLUMN); List<Row> listCOut = dataFrameC.collectAsList(); Row row3 = listCOut.get(0); - Assert.assertEquals("Mistmatch with expected value", "Str12", row3.getString(0)); - Assert.assertEquals("Mistmatch with expected value", "13.0", row3.getString(1)); + Assert.assertEquals("Mistmatch with expected value", "Str12", row3.get(0).toString()); + Assert.assertEquals("Mistmatch with expected value", "13.0", row3.get(1).toString()); Row row4 = listCOut.get(1); - Assert.assertEquals("Mistmatch with expected value", "Str25", row4.getString(0)); - Assert.assertEquals("Mistmatch with expected value", "26.0", row4.getString(1)); + Assert.assertEquals("Mistmatch with expected value", "Str25", row4.get(0)); + Assert.assertEquals("Mistmatch with expected value", "26.0", row4.get(1)); } else { String[][] frameA = mlResults.getFrameAs2DStringArray("A"); Assert.assertEquals("Str2", frameA[0][1]);