[SYSTEMML-776] Upgrade Spark version

Switched DataFrame to Dataset, switched Iterable to Iterator, and updated
constructors for Spark 2.0.2.

Closes #340.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a4c7be78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a4c7be78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a4c7be78

Branch: refs/heads/master
Commit: a4c7be78390d01a3194e726d7a184c182bd8b558
Parents: 627fdbe
Author: Glenn Weidner <gweid...@us.ibm.com>
Authored: Mon Jan 16 16:54:35 2017 -0800
Committer: Glenn Weidner <gweid...@us.ibm.com>
Committed: Mon Jan 16 16:54:35 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |  6 +-
 src/main/java/org/apache/sysml/api/MLBlock.java |  4 +-
 .../java/org/apache/sysml/api/MLContext.java    | 11 ++--
 .../java/org/apache/sysml/api/MLMatrix.java     | 15 ++---
 .../java/org/apache/sysml/api/MLOutput.java     | 11 ++--
 .../sysml/api/mlcontext/BinaryBlockFrame.java   |  9 +--
 .../sysml/api/mlcontext/BinaryBlockMatrix.java  |  9 +--
 .../org/apache/sysml/api/mlcontext/Frame.java   |  5 +-
 .../api/mlcontext/MLContextConversionUtil.java  | 25 ++++----
 .../sysml/api/mlcontext/MLContextUtil.java      | 11 ++--
 .../apache/sysml/api/mlcontext/MLResults.java   | 19 +++---
 .../org/apache/sysml/api/mlcontext/Matrix.java  | 17 ++---
 .../DataPartitionerRemoteSparkMapper.java       |  6 +-
 .../parfor/RemoteDPParForSparkWorker.java       |  4 +-
 .../parfor/RemoteParForSparkWorker.java         |  6 +-
 .../spark/AppendGSPInstruction.java             |  4 +-
 .../spark/CumulativeOffsetSPInstruction.java    |  5 +-
 .../spark/FrameIndexingSPInstruction.java       |  8 +--
 .../instructions/spark/MapmmSPInstruction.java  |  4 +-
 .../spark/MatrixAppendMSPInstruction.java       |  4 +-
 .../spark/MatrixIndexingSPInstruction.java      |  8 +--
 .../spark/MatrixReshapeSPInstruction.java       |  5 +-
 ...ReturnParameterizedBuiltinSPInstruction.java | 16 ++---
 .../instructions/spark/PMapmmSPInstruction.java |  5 +-
 .../ParameterizedBuiltinSPInstruction.java      | 17 ++---
 .../instructions/spark/PmmSPInstruction.java    |  5 +-
 .../spark/QuaternarySPInstruction.java          |  4 +-
 .../instructions/spark/RandSPInstruction.java   |  5 +-
 .../instructions/spark/ReorgSPInstruction.java  |  9 +--
 .../instructions/spark/RmmSPInstruction.java    |  5 +-
 .../spark/TernarySPInstruction.java             |  9 +--
 .../instructions/spark/Tsmm2SPInstruction.java  |  5 +-
 .../functions/ConvertFrameBlockToIJVLines.java  |  4 +-
 .../functions/ConvertMatrixBlockToIJVLines.java |  6 +-
 .../functions/ExtractBlockForBinaryReblock.java |  5 +-
 .../spark/functions/ExtractGroup.java           |  9 +--
 .../spark/functions/ExtractGroupNWeights.java   |  5 +-
 .../functions/ReplicateVectorFunction.java      |  5 +-
 .../spark/functions/SparkListener.java          |  5 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 40 ++++++------
 .../spark/utils/RDDConverterUtils.java          | 34 +++++-----
 .../spark/utils/RDDConverterUtilsExt.java       | 14 ++---
 .../instructions/spark/utils/RDDSortUtils.java  | 32 +++++-----
 .../sysml/runtime/transform/GenTfMtdSPARK.java  |  4 +-
 .../org/apache/sysml/api/ml/ScriptsUtils.scala  |  2 +-
 .../functions/frame/FrameConverterTest.java     | 29 ++++++---
 .../DataFrameMatrixConversionTest.java          |  5 +-
 .../DataFrameRowFrameConversionTest.java        |  5 +-
 .../DataFrameVectorFrameConversionTest.java     |  6 +-
 .../mlcontext/DataFrameVectorScriptTest.java    |  6 +-
 .../functions/mlcontext/FrameTest.java          |  8 +--
 .../mlcontext/MLContextFrameTest.java           | 26 ++++----
 .../integration/mlcontext/MLContextTest.java    | 66 ++++++++++----------
 .../sysml/api/ml/LogisticRegressionSuite.scala  |  3 +-
 54 files changed, 319 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bccc2b..8224b1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,9 +65,9 @@
        <properties>
                <hadoop.version>2.4.1</hadoop.version>
                <antlr.version>4.5.3</antlr.version>
-               <spark.version>1.6.0</spark.version>
-               <scala.version>2.10.5</scala.version>
-               <scala.binary.version>2.10</scala.binary.version>
+               <spark.version>2.0.2</spark.version>
+               <scala.version>2.11.8</scala.version>
+               <scala.binary.version>2.11</scala.binary.version>
                <scala.test.version>2.2.6</scala.test.version>
                <maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss 
z</maven.build.timestamp.format>
                <enableGPU>false</enableGPU>

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLBlock.java 
b/src/main/java/org/apache/sysml/api/MLBlock.java
index 934c4b0..d569c54 100644
--- a/src/main/java/org/apache/sysml/api/MLBlock.java
+++ b/src/main/java/org/apache/sysml/api/MLBlock.java
@@ -253,8 +253,8 @@ public class MLBlock implements Row {
        public static StructType getDefaultSchemaForBinaryBlock() {
                // TODO:
                StructField[] fields = new StructField[2];
-               fields[0] = new StructField("IgnoreSchema", 
DataType.fromCaseClassString("DoubleType"), true, null);
-               fields[1] = new StructField("IgnoreSchema1", 
DataType.fromCaseClassString("DoubleType"), true, null);
+               fields[0] = new StructField("IgnoreSchema", 
DataType.fromJson("DoubleType"), true, null);
+               fields[1] = new StructField("IgnoreSchema1", 
DataType.fromJson("DoubleType"), true, null);
                return new StructType(fields);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index a4f25b8..417ce8d 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -35,7 +35,8 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.jmlc.JMLCUtils;
@@ -263,7 +264,7 @@ public class MLContext {
         * @param df the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void registerInput(String varName, DataFrame df) throws 
DMLRuntimeException {
+       public void registerInput(String varName, Dataset<Row> df) throws 
DMLRuntimeException {
                registerInput(varName, df, false);
        }
        
@@ -278,7 +279,7 @@ public class MLContext {
         * @param df the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void registerFrameInput(String varName, DataFrame df) throws 
DMLRuntimeException {
+       public void registerFrameInput(String varName, Dataset<Row> df) throws 
DMLRuntimeException {
                registerFrameInput(varName, df, false);
        }
        
@@ -292,7 +293,7 @@ public class MLContext {
         * @param containsID false if the DataFrame has an column ID which 
denotes the row ID.
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void registerInput(String varName, DataFrame df, boolean 
containsID) throws DMLRuntimeException {
+       public void registerInput(String varName, Dataset<Row> df, boolean 
containsID) throws DMLRuntimeException {
                int blksz = ConfigurationManager.getBlocksize();
                MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, 
blksz, blksz);
                JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtils
@@ -309,7 +310,7 @@ public class MLContext {
         * @param containsID false if the DataFrame has an column ID which 
denotes the row ID.
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void registerFrameInput(String varName, DataFrame df, boolean 
containsID) throws DMLRuntimeException {
+       public void registerFrameInput(String varName, Dataset<Row> df, boolean 
containsID) throws DMLRuntimeException {
                int blksz = ConfigurationManager.getBlocksize();
                MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, 
blksz, blksz);
                JavaPairRDD<Long, FrameBlock> rdd = 
FrameRDDConverterUtils.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, 
mcOut, containsID);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java 
b/src/main/java/org/apache/sysml/api/MLMatrix.java
index 6019c5a..2bcfb5c 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -23,10 +23,11 @@ import java.util.List;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SQLContext.QueryExecution;
+import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -62,26 +63,26 @@ import scala.Tuple2;
  result.write("Result_small.mtx", "text")
  
  */
-public class MLMatrix extends DataFrame {
+public class MLMatrix extends Dataset<Row> {
        private static final long serialVersionUID = -7005940673916671165L;
        
        protected MatrixCharacteristics mc = null;
        protected MLContext ml = null;
        
        protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, 
MLContext ml) {
-               super(sqlContext, logicalPlan);
+               super(sqlContext, logicalPlan, RowEncoder.apply(null));
                this.ml = ml;
        }
 
        protected MLMatrix(SQLContext sqlContext, QueryExecution 
queryExecution, MLContext ml) {
-               super(sqlContext, queryExecution);
+               super(sqlContext.sparkSession(), queryExecution, 
RowEncoder.apply(null));
                this.ml = ml;
        }
        
        // Only used internally to set a new MLMatrix after one of matrix 
operations.
        // Not to be used externally.
-       protected MLMatrix(DataFrame df, MatrixCharacteristics mc, MLContext 
ml) throws DMLRuntimeException {
-               super(df.sqlContext(), df.logicalPlan());
+       protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext 
ml) throws DMLRuntimeException {
+               super(df.sqlContext(), df.logicalPlan(), 
RowEncoder.apply(null));
                this.mc = mc;
                this.ml = ml;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index f760e2b..6acca68 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -24,7 +24,8 @@ import java.util.Map;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -91,7 +92,7 @@ public class MLOutput {
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public DataFrame getDF(SQLContext sqlContext, String varName) throws 
DMLRuntimeException {
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws 
DMLRuntimeException {
                if(sqlContext == null) {
                        throw new DMLRuntimeException("SQLContext is not 
created.");
                }
@@ -111,7 +112,7 @@ public class MLOutput {
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public DataFrame getDF(SQLContext sqlContext, String varName, boolean 
outputVector) throws DMLRuntimeException {
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
boolean outputVector) throws DMLRuntimeException {
                if(sqlContext == null) {
                        throw new DMLRuntimeException("SQLContext is not 
created.");
                }
@@ -137,7 +138,7 @@ public class MLOutput {
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public DataFrame getDF(SQLContext sqlContext, String varName, 
MatrixCharacteristics mc) 
+       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
MatrixCharacteristics mc) 
                throws DMLRuntimeException 
        {
                if(sqlContext == null)
@@ -173,7 +174,7 @@ public class MLOutput {
                
        }
        
-       public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) 
throws DMLRuntimeException {
+       public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext 
jsc) throws DMLRuntimeException {
                JavaPairRDD<Long, FrameBlock> binaryRDD = 
getFrameBinaryBlockedRDD(varName);
                MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
                return FrameRDDConverterUtils.binaryBlockToDataFrame(new 
SQLContext(jsc), binaryRDD, mcIn, null);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java 
b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java
index 5812cf6..e36dfb4 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java
@@ -20,7 +20,8 @@
 package org.apache.sysml.api.mlcontext;
 
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -45,7 +46,7 @@ public class BinaryBlockFrame {
         * @param frameMetadata
         *            frame metadata, such as number of rows and columns
         */
-       public BinaryBlockFrame(DataFrame dataFrame, FrameMetadata 
frameMetadata) {
+       public BinaryBlockFrame(Dataset<Row> dataFrame, FrameMetadata 
frameMetadata) {
                this.frameMetadata = frameMetadata;
                binaryBlocks = 
MLContextConversionUtil.dataFrameToFrameBinaryBlocks(dataFrame, frameMetadata);
        }
@@ -61,7 +62,7 @@ public class BinaryBlockFrame {
         * @param numCols
         *            the number of columns
         */
-       public BinaryBlockFrame(DataFrame dataFrame, long numRows, long 
numCols) {
+       public BinaryBlockFrame(Dataset<Row> dataFrame, long numRows, long 
numCols) {
                this(dataFrame, new FrameMetadata(numRows, numCols, 
                                ConfigurationManager.getBlocksize(), 
ConfigurationManager.getBlocksize()));
        }
@@ -72,7 +73,7 @@ public class BinaryBlockFrame {
         * @param dataFrame
         *            the Spark DataFrame
         */
-       public BinaryBlockFrame(DataFrame dataFrame) {
+       public BinaryBlockFrame(Dataset<Row> dataFrame) {
                this(dataFrame, new FrameMetadata());
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java 
b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java
index abbdcc0..abfad09 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java
@@ -20,7 +20,8 @@
 package org.apache.sysml.api.mlcontext;
 
 import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -45,7 +46,7 @@ public class BinaryBlockMatrix {
         * @param matrixMetadata
         *            matrix metadata, such as number of rows and columns
         */
-       public BinaryBlockMatrix(DataFrame dataFrame, MatrixMetadata 
matrixMetadata) {
+       public BinaryBlockMatrix(Dataset<Row> dataFrame, MatrixMetadata 
matrixMetadata) {
                this.matrixMetadata = matrixMetadata;
                binaryBlocks = 
MLContextConversionUtil.dataFrameToMatrixBinaryBlocks(dataFrame, 
matrixMetadata);
        }
@@ -61,7 +62,7 @@ public class BinaryBlockMatrix {
         * @param numCols
         *            the number of columns
         */
-       public BinaryBlockMatrix(DataFrame dataFrame, long numRows, long 
numCols) {
+       public BinaryBlockMatrix(Dataset<Row> dataFrame, long numRows, long 
numCols) {
                this(dataFrame, new MatrixMetadata(numRows, numCols, 
                                ConfigurationManager.getBlocksize(), 
ConfigurationManager.getBlocksize()));
        }
@@ -72,7 +73,7 @@ public class BinaryBlockMatrix {
         * @param dataFrame
         *            the Spark DataFrame
         */
-       public BinaryBlockMatrix(DataFrame dataFrame) {
+       public BinaryBlockMatrix(Dataset<Row> dataFrame) {
                this(dataFrame, new MatrixMetadata());
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/Frame.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/Frame.java 
b/src/main/java/org/apache/sysml/api/mlcontext/Frame.java
index 85dca64..6ca911d 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/Frame.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/Frame.java
@@ -21,7 +21,8 @@ package org.apache.sysml.api.mlcontext;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 
@@ -98,7 +99,7 @@ public class Frame {
         * 
         * @return the frame as a {@code DataFrame}
         */
-       public DataFrame toDF() {
+       public Dataset<Row> toDF() {
                return 
MLContextConversionUtil.frameObjectToDataFrame(frameObject, 
sparkExecutionContext);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 6e3a19a..ca853ef 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -35,7 +35,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -306,7 +307,7 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code MatrixObject}
         */
-       public static MatrixObject dataFrameToMatrixObject(String variableName, 
DataFrame dataFrame) {
+       public static MatrixObject dataFrameToMatrixObject(String variableName, 
Dataset<Row> dataFrame) {
                return dataFrameToMatrixObject(variableName, dataFrame, null);
        }
 
@@ -322,7 +323,7 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code MatrixObject}
         */
-       public static MatrixObject dataFrameToMatrixObject(String variableName, 
DataFrame dataFrame,
+       public static MatrixObject dataFrameToMatrixObject(String variableName, 
Dataset<Row> dataFrame,
                MatrixMetadata matrixMetadata) 
        {
                matrixMetadata = (matrixMetadata!=null) ? matrixMetadata : new 
MatrixMetadata();
@@ -340,7 +341,7 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} matrix converted to a converted to a
         *         {@code FrameObject}
         */
-       public static FrameObject dataFrameToFrameObject(String variableName, 
DataFrame dataFrame) {
+       public static FrameObject dataFrameToFrameObject(String variableName, 
Dataset<Row> dataFrame) {
                return dataFrameToFrameObject(variableName, dataFrame, null);
        }
 
@@ -356,7 +357,7 @@ public class MLContextConversionUtil {
         * @return the {@code DataFrame} frame converted to a converted to a
         *         {@code FrameObject}
         */
-       public static FrameObject dataFrameToFrameObject(String variableName, 
DataFrame dataFrame, FrameMetadata frameMetadata) 
+       public static FrameObject dataFrameToFrameObject(String variableName, 
Dataset<Row> dataFrame, FrameMetadata frameMetadata) 
        {
                try {
                        //setup meta data and java spark context
@@ -395,7 +396,7 @@ public class MLContextConversionUtil {
         *         {@code JavaPairRDD<MatrixIndexes,
         *         MatrixBlock>} binary-block matrix
         */
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToMatrixBinaryBlocks(DataFrame dataFrame) {
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToMatrixBinaryBlocks(Dataset<Row> dataFrame) {
                return dataFrameToMatrixBinaryBlocks(dataFrame, null);
        }
 
@@ -412,7 +413,7 @@ public class MLContextConversionUtil {
         *         MatrixBlock>} binary-block matrix
         */
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToMatrixBinaryBlocks(
-                       DataFrame dataFrame, MatrixMetadata matrixMetadata) 
+                       Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) 
        {
                //handle meta data
                determineMatrixFormatIfNeeded(dataFrame, matrixMetadata);
@@ -447,7 +448,7 @@ public class MLContextConversionUtil {
         *         {@code JavaPairRDD<Long,
         *         FrameBlock>} binary-block frame
         */
-       public static JavaPairRDD<Long, FrameBlock> 
dataFrameToFrameBinaryBlocks(DataFrame dataFrame,
+       public static JavaPairRDD<Long, FrameBlock> 
dataFrameToFrameBinaryBlocks(Dataset<Row> dataFrame,
                        FrameMetadata frameMetadata) {
                throw new MLContextException("dataFrameToFrameBinaryBlocks is 
unimplemented");
        }
@@ -461,7 +462,7 @@ public class MLContextConversionUtil {
         * @param matrixMetadata
         *            the matrix metadata, if available
         */
-       public static void determineMatrixFormatIfNeeded(DataFrame dataFrame, 
MatrixMetadata matrixMetadata) {
+       public static void determineMatrixFormatIfNeeded(Dataset<Row> 
dataFrame, MatrixMetadata matrixMetadata) {
                MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat();
                if (matrixFormat != null) {
                        return;
@@ -505,7 +506,7 @@ public class MLContextConversionUtil {
         * @param frameMetadata
         *            the frame metadata, if available
         */
-       public static void determineFrameFormatIfNeeded(DataFrame dataFrame, 
FrameMetadata frameMetadata) {
+       public static void determineFrameFormatIfNeeded(Dataset<Row> dataFrame, 
FrameMetadata frameMetadata) {
                FrameFormat frameFormat = frameMetadata.getFrameFormat();
                if (frameFormat != null) {
                        return;
@@ -1229,7 +1230,7 @@ public class MLContextConversionUtil {
         * @param isVectorDF is the DataFrame a vector DataFrame?
         * @return the {@code MatrixObject} converted to a {@code DataFrame}
         */
-       public static DataFrame matrixObjectToDataFrame(MatrixObject 
matrixObject,
+       public static Dataset<Row> matrixObjectToDataFrame(MatrixObject 
matrixObject,
                        SparkExecutionContext sparkExecutionContext, boolean 
isVectorDF) {
                try {
                        @SuppressWarnings("unchecked")
@@ -1256,7 +1257,7 @@ public class MLContextConversionUtil {
         *            the Spark execution context
         * @return the {@code FrameObject} converted to a {@code DataFrame}
         */
-       public static DataFrame frameObjectToDataFrame(FrameObject frameObject,
+       public static Dataset<Row> frameObjectToDataFrame(FrameObject 
frameObject,
                        SparkExecutionContext sparkExecutionContext) {
                try {
                        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index b37654c..74e17ee 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -40,7 +40,8 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.mllib.linalg.VectorUDT;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
@@ -79,7 +80,7 @@ public final class MLContextUtil {
         * Complex data types supported by the MLContext API
         */
        @SuppressWarnings("rawtypes")
-       public static final Class[] COMPLEX_DATA_TYPES = { JavaRDD.class, 
RDD.class, DataFrame.class,
+       public static final Class[] COMPLEX_DATA_TYPES = { JavaRDD.class, 
RDD.class, Dataset.class,
                        BinaryBlockMatrix.class, BinaryBlockFrame.class, 
Matrix.class, Frame.class, (new double[][] {}).getClass(),
                        MatrixBlock.class, URL.class };
 
@@ -491,8 +492,8 @@ public final class MLContextUtil {
                } else if (value instanceof FrameBlock) {
                        FrameBlock frameBlock = (FrameBlock) value;
                        return 
MLContextConversionUtil.frameBlockToFrameObject(name, frameBlock, 
(FrameMetadata) metadata);
-               } else if (value instanceof DataFrame) {
-                       DataFrame dataFrame = (DataFrame) value;
+               } else if (value instanceof Dataset<?>) {
+                       Dataset<Row> dataFrame = (Dataset<Row>) value;
 
                        if (hasMatrixMetadata) {
                                return 
MLContextConversionUtil.dataFrameToMatrixObject(name, dataFrame, 
(MatrixMetadata) metadata);
@@ -578,7 +579,7 @@ public final class MLContextUtil {
         * @return {@code true} if the DataFrame appears to be a matrix,
         *         {@code false} otherwise
         */
-       public static boolean doesDataFrameLookLikeMatrix(DataFrame df) {
+       public static boolean doesDataFrameLookLikeMatrix(Dataset<Row> df) {
                StructType schema = df.schema();
                StructField[] fields = schema.fields();
                if (fields == null) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java
index 9ceef11..17cac1f 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java
@@ -23,7 +23,8 @@ import java.util.Set;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -283,7 +284,7 @@ public class MLResults {
         *            the name of the output
         * @return the output as a {@code DataFrame}
         */
-       public DataFrame getDataFrame(String outputName) {
+       public Dataset<Row> getDataFrame(String outputName) {
                if (isMatrixObject(outputName)) {
                        MatrixObject mo = getMatrixObject(outputName);
                        return 
MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, 
false);
@@ -349,7 +350,7 @@ public class MLResults {
         * @return the output as a {@code DataFrame} of doubles or vectors with 
an
         *         ID column
         */
-       public DataFrame getDataFrame(String outputName, boolean isVectorDF) {
+       public Dataset<Row> getDataFrame(String outputName, boolean isVectorDF) 
{
                if (isFrameObject(outputName)) {
                        throw new MLContextException("This method currently 
supports only matrices");
                }
@@ -375,7 +376,7 @@ public class MLResults {
         *            the name of the output
         * @return the output as a {@code DataFrame} of doubles with an ID 
column
         */
-       public DataFrame getDataFrameDoubleWithIDColumn(String outputName) {
+       public Dataset<Row> getDataFrameDoubleWithIDColumn(String outputName) {
                if (isFrameObject(outputName)) {
                        throw new MLContextException("This method currently 
supports only matrices");
                }
@@ -401,7 +402,7 @@ public class MLResults {
         *            the name of the output
         * @return the output as a {@code DataFrame} of vectors with an ID 
column
         */
-       public DataFrame getDataFrameVectorWithIDColumn(String outputName) {
+       public Dataset<Row> getDataFrameVectorWithIDColumn(String outputName) {
                if (isFrameObject(outputName)) {
                        throw new MLContextException("This method currently 
supports only matrices");
                }
@@ -427,12 +428,12 @@ public class MLResults {
         *            the name of the output
         * @return the output as a {@code DataFrame} of doubles with no ID 
column
         */
-       public DataFrame getDataFrameDoubleNoIDColumn(String outputName) {
+       public Dataset<Row> getDataFrameDoubleNoIDColumn(String outputName) {
                if (isFrameObject(outputName)) {
                        throw new MLContextException("This method currently 
supports only matrices");
                }
                MatrixObject mo = getMatrixObject(outputName);
-               DataFrame df = 
MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, 
false);
+               Dataset<Row> df = 
MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, 
false);
                return df.drop(RDDConverterUtils.DF_ID_COLUMN);
        }
 
@@ -454,12 +455,12 @@ public class MLResults {
         *            the name of the output
         * @return the output as a {@code DataFrame} of vectors with no ID 
column
         */
-       public DataFrame getDataFrameVectorNoIDColumn(String outputName) {
+       public Dataset<Row> getDataFrameVectorNoIDColumn(String outputName) {
                if (isFrameObject(outputName)) {
                        throw new MLContextException("This method currently 
supports only matrices");
                }
                MatrixObject mo = getMatrixObject(outputName);
-               DataFrame df = 
MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, 
true);
+               Dataset<Row> df = 
MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, 
true);
                return df.drop(RDDConverterUtils.DF_ID_COLUMN);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java 
b/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java
index aa8033d..9952350 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java
@@ -21,7 +21,8 @@ package org.apache.sysml.api.mlcontext;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -102,7 +103,7 @@ public class Matrix {
         * 
         * @return the matrix as a {@code DataFrame} of doubles with an ID 
column
         */
-       public DataFrame toDF() {
+       public Dataset<Row> toDF() {
                return 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, false);
        }
 
@@ -111,7 +112,7 @@ public class Matrix {
         * 
         * @return the matrix as a {@code DataFrame} of doubles with an ID 
column
         */
-       public DataFrame toDFDoubleWithIDColumn() {
+       public Dataset<Row> toDFDoubleWithIDColumn() {
                return 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, false);
        }
 
@@ -120,8 +121,8 @@ public class Matrix {
         * 
         * @return the matrix as a {@code DataFrame} of doubles with no ID 
column
         */
-       public DataFrame toDFDoubleNoIDColumn() {
-               DataFrame df = 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, false);
+       public Dataset<Row> toDFDoubleNoIDColumn() {
+               Dataset<Row> df = 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, false);
                return df.drop(RDDConverterUtils.DF_ID_COLUMN);
        }
 
@@ -130,7 +131,7 @@ public class Matrix {
         * 
         * @return the matrix as a {@code DataFrame} of vectors with an ID 
column
         */
-       public DataFrame toDFVectorWithIDColumn() {
+       public Dataset<Row> toDFVectorWithIDColumn() {
                return 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, true);
        }
 
@@ -139,8 +140,8 @@ public class Matrix {
         * 
         * @return the matrix as a {@code DataFrame} of vectors with no ID 
column
         */
-       public DataFrame toDFVectorNoIDColumn() {
-               DataFrame df = 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, true);
+       public Dataset<Row> toDFVectorNoIDColumn() {
+               Dataset<Row> df = 
MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, 
sparkExecutionContext, true);
                return df.drop(RDDConverterUtils.DF_ID_COLUMN);
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
index 8f33081..9e690d5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java
@@ -20,12 +20,12 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
@@ -73,7 +73,7 @@ public class DataPartitionerRemoteSparkMapper extends 
ParWorker implements PairF
 
 
        @Override
-       public Iterable<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, 
MatrixBlock> arg0) 
+       public Iterator<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, 
MatrixBlock> arg0) 
                throws Exception 
        {       
                List<Tuple2<Long, Writable>> ret = new LinkedList<Tuple2<Long, 
Writable>>();
@@ -148,7 +148,7 @@ public class DataPartitionerRemoteSparkMapper extends 
ParWorker implements PairF
                                throw new DMLRuntimeException("Unsupported 
partition format: "+_dpf);
                }
                
-               return ret;
+               return ret.iterator();
        }
        
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 2a0dc7d..c973115 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -94,7 +94,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
        }
        
        @Override 
-       public Iterable<Tuple2<Long, String>> call(Iterator<Tuple2<Long, 
Iterable<Writable>>> arg0)
+       public Iterator<Tuple2<Long, String>> call(Iterator<Tuple2<Long, 
Iterable<Writable>>> arg0)
                throws Exception 
        {
                ArrayList<Tuple2<Long,String>> ret = new 
ArrayList<Tuple2<Long,String>>();
@@ -137,7 +137,7 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                                ret.add(new Tuple2<Long,String>(_workerID, 
val));
                }       
                
-               return ret;
+               return ret.iterator();
        }
 
        private void configureWorker( long ID ) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 4dc5126..75e5137 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -21,11 +21,11 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.Accumulator;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
@@ -60,7 +60,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        }
        
        @Override 
-       public Iterable<Tuple2<Long, String>> call(Task arg0)
+       public Iterator<Tuple2<Long, String>> call(Task arg0)
                throws Exception 
        {
                //lazy parworker initialization
@@ -82,7 +82,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                for( String val : tmp )
                        ret.add(new Tuple2<Long,String>(_workerID, val));
                        
-               return ret;
+               return ret.iterator();
        }
 
        private void configureWorker( long ID ) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
index 3d16391..092af3b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
@@ -176,7 +176,7 @@ public class AppendGSPInstruction extends 
BinarySPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
                        throws Exception 
                {
                        //common preparation
@@ -240,7 +240,7 @@ public class AppendGSPInstruction extends 
BinarySPInstruction
                                }
                        }
                        
-                       return retVal;
+                       return retVal.iterator();
                }
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
index 5a26167..ab0c3ed 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
@@ -134,7 +135,7 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
@@ -167,7 +168,7 @@ public class CumulativeOffsetSPInstruction extends 
BinarySPInstruction
                                        ret.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(tmpix, tmpblk));
                                }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
index 6e25446..e19b617 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java
@@ -208,13 +208,13 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> rightKV) 
+               public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> rightKV) 
                        throws Exception 
                {
                        Pair<Long,FrameBlock> in = 
SparkUtils.toIndexedFrameBlock(rightKV);                     
                        ArrayList<Pair<Long,FrameBlock>> out = new 
ArrayList<Pair<Long,FrameBlock>>();
                        OperationsOnMatrixValues.performShift(in, _ixrange, 
_brlen, _bclen, _rlen, _clen, out);
-                       return SparkUtils.fromIndexedFrameBlock(out);
+                       return SparkUtils.fromIndexedFrameBlock(out).iterator();
                }               
        }
 
@@ -237,7 +237,7 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> kv) 
+               public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long, 
FrameBlock> kv) 
                        throws Exception 
                {
                        ArrayList<Pair<Long,FrameBlock>> out = new 
ArrayList<Pair<Long,FrameBlock>>();
@@ -269,7 +269,7 @@ public class FrameIndexingSPInstruction  extends 
IndexingSPInstruction
                                curBlockRange.rowStart =  lGblStartRow + _brlen;
                                iRowStartDest = 
UtilFunctions.computeCellInBlock(iRowStartDest+iMaxRowsToCopy+1, _brlen);
                        }
-                       return SparkUtils.fromIndexedFrameBlock(out);
+                       return SparkUtils.fromIndexedFrameBlock(out).iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index dfe8791..310664f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -414,7 +414,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
@@ -458,7 +458,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                }
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
index 7100011..0765873 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java
@@ -121,7 +121,7 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
@@ -183,7 +183,7 @@ public class MatrixAppendMSPInstruction extends 
AppendMSPInstruction
                                
ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist));
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 8d4d296..5b1012f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -297,13 +297,13 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> rightKV) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> rightKV) 
                        throws Exception 
                {
                        IndexedMatrixValue in = 
SparkUtils.toIndexedMatrixBlock(rightKV);                       
                        ArrayList<IndexedMatrixValue> out = new 
ArrayList<IndexedMatrixValue>();
                        OperationsOnMatrixValues.performShift(in, _ixrange, 
_brlen, _bclen, _rlen, _clen, out);
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }               
        }
 
@@ -420,13 +420,13 @@ public class MatrixIndexingSPInstruction  extends 
IndexingSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
                        throws Exception 
                {       
                        IndexedMatrixValue in = 
SparkUtils.toIndexedMatrixBlock(kv);
                        ArrayList<IndexedMatrixValue> outlist = new 
ArrayList<IndexedMatrixValue>();
                        OperationsOnMatrixValues.performSlice(in, _ixrange, 
_brlen, _bclen, outlist);
-                       return SparkUtils.fromIndexedMatrixBlock(outlist);
+                       return 
SparkUtils.fromIndexedMatrixBlock(outlist).iterator();
                }               
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 6b27240..5941285 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -126,7 +127,7 @@ public class MatrixReshapeSPInstruction extends 
UnarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        //input conversion (for libmatrixreorg compatibility)
@@ -138,7 +139,7 @@ public class MatrixReshapeSPInstruction extends 
UnarySPInstruction
                     out, _mcOut.getRows(), _mcOut.getCols(), 
_mcOut.getRowsPerBlock(), _mcOut.getColsPerBlock(), _byrow);
 
                        //output conversion (for compatibility w/ rdd schema)
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index 319d833..daa1ce5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -210,7 +210,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                }
                
                @Override
-               public Iterable<Tuple2<Integer, Object>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
+               public Iterator<Tuple2<Integer, Object>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
                        throws Exception 
                {
                        //build meta data (e.g., recode maps)
@@ -226,7 +226,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                        ret.add(new 
Tuple2<Integer,Object>(e1.getKey(), token));
                        _raEncoder.getCPRecodeMapsPartial().clear();
                
-                       return ret;
+                       return ret.iterator();
                }
        }
        
@@ -249,7 +249,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                }
                
                @Override
-               public Iterable<String> call(Tuple2<Integer, Iterable<Object>> 
arg0)
+               public Iterator<String> call(Tuple2<Integer, Iterable<Object>> 
arg0)
                        throws Exception 
                {
                        String colID = String.valueOf(arg0._1());
@@ -271,7 +271,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                        }
                        _accMax.add(rowID-1);
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 
@@ -306,7 +306,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                }
                
                @Override
-               public Iterable<Tuple2<Integer, ColumnMetadata>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
+               public Iterator<Tuple2<Integer, ColumnMetadata>> 
call(Iterator<Tuple2<Long, FrameBlock>> iter)
                        throws Exception 
                {
                        //build meta data (e.g., histograms and means)
@@ -335,7 +335,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                }
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 
@@ -350,7 +350,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                }
 
                @Override
-               public Iterable<String> call(Tuple2<Integer, 
Iterable<ColumnMetadata>> arg0)
+               public Iterator<String> call(Tuple2<Integer, 
Iterable<ColumnMetadata>> arg0)
                                throws Exception 
                {
                        int colix = arg0._1();
@@ -392,7 +392,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction 
extends ComputationSPI
                                        ret.add("-2 " + colix + " " + 
iter.next().getMvValue());
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index 8aaa8b6..5efba81 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
 
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -178,7 +179,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
                        PartitionedBlock<MatrixBlock> pm = _pbc.value();
@@ -204,7 +205,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                                ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(ixOut, blkOut));
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 06decc0..d7f4f7d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -543,7 +544,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg0)
                        throws Exception 
                {
                        //prepare inputs (for internal api compatibility)
@@ -555,7 +556,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        LibMatrixReorg.rmempty(data, offsets, _rmRows, _len, 
_brlen, _bclen, out);
 
                        //prepare and return outputs
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }
        }
 
@@ -580,7 +581,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
                        //prepare inputs (for internal api compatibility)
@@ -597,7 +598,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        LibMatrixReorg.rmempty(data, offsets, _rmRows, _len, 
_brlen, _bclen, out);
 
                        //prepare and return outputs
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }
        }
 
@@ -623,7 +624,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
                        //prepare inputs (for internal api compatibility)
@@ -634,7 +635,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        LibMatrixReorg.rexpand(data, _maxVal, _dirRows, _cast, 
_ignore, _brlen, _bclen, out);
                        
                        //prepare and return outputs
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }
        }
        
@@ -658,7 +659,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
                        //get all inputs
@@ -672,7 +673,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        
OperationsOnMatrixValues.performMapGroupedAggregate(_op, in1, groups, _ngroups, 
_brlen, _bclen, outlist);
                        
                        //output all result blocks
-                       return SparkUtils.fromIndexedMatrixBlock(outlist);
+                       return 
SparkUtils.fromIndexedMatrixBlock(outlist).iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
index c3d2c27..ce87c46 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
 
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
@@ -130,7 +131,7 @@ public class PmmSPInstruction extends BinarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
@@ -170,7 +171,7 @@ public class PmmSPInstruction extends BinarySPInstruction
                                        ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(new MatrixIndexes(rowIX2, ixIn.getColumnIndex()), out2));
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
index 063b6e7..0ae8a67 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -548,7 +548,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        LinkedList<Tuple2<MatrixIndexes, MatrixBlock>> ret = 
new LinkedList<Tuple2<MatrixIndexes, MatrixBlock>>();
@@ -579,7 +579,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                        }
                        
                        //output list of new tuples
-                       return ret;
+                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 1f66825..8df8b26 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.Random;
 
 import org.apache.commons.math3.distribution.PoissonDistribution;
@@ -626,7 +627,7 @@ public class RandSPInstruction extends UnarySPInstruction
                }
                
                @Override
-               public Iterable<Double> call(SampleTask t)
+               public Iterator<Double> call(SampleTask t)
                                throws Exception {
 
                        long st = t.range_start;
@@ -660,7 +661,7 @@ public class RandSPInstruction extends UnarySPInstruction
                                                        retList.add((double) i);
                                }
                        }
-                       return retList;
+                       return retList.iterator();
                }
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
index fb4be29..62ffbce 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
@@ -239,7 +240,7 @@ public class ReorgSPInstruction extends UnarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
@@ -264,7 +265,7 @@ public class ReorgSPInstruction extends UnarySPInstruction
                                }
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 
@@ -281,7 +282,7 @@ public class ReorgSPInstruction extends UnarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        //construct input
@@ -292,7 +293,7 @@ public class ReorgSPInstruction extends UnarySPInstruction
                        LibMatrixReorg.rev(in, _mcIn.getRows(), 
_mcIn.getRowsPerBlock(), out);
                        
                        //construct output
-                       return SparkUtils.fromIndexedMatrixBlock(out);
+                       return 
SparkUtils.fromIndexedMatrixBlock(out).iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index 0e01bb4..e6b5755 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 
+import java.util.Iterator;
 import java.util.LinkedList;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -119,7 +120,7 @@ public class RmmSPInstruction extends BinarySPInstruction
                }
                
                @Override
-               public Iterable<Tuple2<TripleIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
+               public Iterator<Tuple2<TripleIndexes, MatrixBlock>> call( 
Tuple2<MatrixIndexes, MatrixBlock> arg0 ) 
                        throws Exception 
                {
                        LinkedList<Tuple2<TripleIndexes, MatrixBlock>> ret = 
new LinkedList<Tuple2<TripleIndexes, MatrixBlock>>();
@@ -152,7 +153,7 @@ public class RmmSPInstruction extends BinarySPInstruction
                        }
                        
                        //output list of new tuples
-                       return ret;
+                       return ret.iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java
index 8b0eb42..a25dcf0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
@@ -258,7 +259,7 @@ public class TernarySPInstruction extends 
ComputationSPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, Double>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
+               public Iterator<Tuple2<MatrixIndexes, Double>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
                        throws Exception 
                {
                        MatrixIndexes ix = arg0._1();
@@ -279,7 +280,7 @@ public class TernarySPInstruction extends 
ComputationSPInstruction
                                        retVal.add(new 
Tuple2<MatrixIndexes,Double>(p.getKey(), p.getValue()));
                        }
                        
-                       return retVal;
+                       return retVal.iterator();
                }
        }
        
@@ -460,7 +461,7 @@ public class TernarySPInstruction extends 
ComputationSPInstruction
                
                @SuppressWarnings("deprecation")
                @Override
-               public Iterable<Tuple2<MatrixIndexes, Double>> call(CTableMap 
ctableMap)
+               public Iterator<Tuple2<MatrixIndexes, Double>> call(CTableMap 
ctableMap)
                                throws Exception {
                        ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new 
ArrayList<Tuple2<MatrixIndexes, Double>>();
                        
@@ -472,7 +473,7 @@ public class TernarySPInstruction extends 
ComputationSPInstruction
                                // retVal.add(new Tuple2<MatrixIndexes, 
MatrixCell>(blockIndexes, cell));
                                retVal.add(new Tuple2<MatrixIndexes, 
Double>(new MatrixIndexes(i, j), v));
                        }
-                       return retVal;
+                       return retVal.iterator();
                }
                
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index 8fe9a7d..da917b5 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark;
 
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -145,7 +146,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
                        throws Exception 
                {
                        List<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
@@ -174,7 +175,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction
                                ret.add(new 
Tuple2<MatrixIndexes,MatrixBlock>(ixout3, out3));
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
index 792e0b6..cd6e58d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
@@ -32,7 +32,7 @@ public class ConvertFrameBlockToIJVLines implements 
FlatMapFunction<Tuple2<Long,
        private static final long serialVersionUID = 1803516615963340115L;
 
        @Override
-       public Iterable<String> call(Tuple2<Long, FrameBlock> kv) 
+       public Iterator<String> call(Tuple2<Long, FrameBlock> kv) 
                throws Exception 
        {
                long rowoffset = kv._1;
@@ -68,6 +68,6 @@ public class ConvertFrameBlockToIJVLines implements 
FlatMapFunction<Tuple2<Long,
                        }
                }
                
-               return cells;
+               return cells.iterator();
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java
index 788615a..621fb06 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java
@@ -39,12 +39,12 @@ public class ConvertMatrixBlockToIJVLines implements 
FlatMapFunction<Tuple2<Matr
        }
        
        @Override
-       public Iterable<String> call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
throws Exception {
+       public Iterator<String> call(Tuple2<MatrixIndexes, MatrixBlock> kv) 
throws Exception {
                final BinaryBlockToTextCellConverter converter = new 
BinaryBlockToTextCellConverter();
                converter.setBlockSize(brlen, bclen);
                converter.convert(kv._1, kv._2);
                
-               return new Iterable<String>() {
+               Iterable<String> ret = new Iterable<String>() {
                        @Override
                        public Iterator<String> iterator() {
                                return new Iterator<String>() {
@@ -64,6 +64,8 @@ public class ConvertMatrixBlockToIJVLines implements 
FlatMapFunction<Tuple2<Matr
                                };
                        }
                };
+               
+               return ret.iterator();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
index cc361c6..61b4385 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark.functions;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
@@ -60,7 +61,7 @@ public class ExtractBlockForBinaryReblock implements 
PairFlatMapFunction<Tuple2<
        }
        
        @Override
-       public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
+       public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
                throws Exception 
        {
                MatrixIndexes ixIn = arg0._1();
@@ -107,7 +108,7 @@ public class ExtractBlockForBinaryReblock implements 
PairFlatMapFunction<Tuple2<
                                retVal.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(indx, blk));
                        }
                }
-               return retVal;
+               return retVal.iterator();
        }
 
        private long getEndGlobalIndex(long blockIndex, boolean isIn, boolean 
isRow) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
index 1ab8ecd..56c1c46 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark.functions;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
@@ -109,7 +110,7 @@ public abstract class ExtractGroup implements Serializable
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
+               public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call(
                                Tuple2<MatrixIndexes, Tuple2<MatrixBlock, 
MatrixBlock>> arg)
                                throws Exception 
                {
@@ -117,7 +118,7 @@ public abstract class ExtractGroup implements Serializable
                        MatrixBlock group = arg._2._1;
                        MatrixBlock target = arg._2._2;
        
-                       return execute(ix, group, target);
+                       return execute(ix, group, target).iterator();
                }       
        }
 
@@ -133,7 +134,7 @@ public abstract class ExtractGroup implements Serializable
                }
                
                @Override
-               public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
+               public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call(
                                Tuple2<MatrixIndexes, MatrixBlock> arg)
                                throws Exception 
                {
@@ -141,7 +142,7 @@ public abstract class ExtractGroup implements Serializable
                        MatrixBlock group = 
_pbm.getBlock((int)ix.getRowIndex(), 1);
                        MatrixBlock target = arg._2;
                        
-                       return execute(ix, group, target);
+                       return execute(ix, group, target).iterator();
                }       
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java
index 372ebea..d040dbc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark.functions;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
@@ -35,7 +36,7 @@ public class ExtractGroupNWeights implements 
PairFlatMapFunction<Tuple2<MatrixIn
        private static final long serialVersionUID = -188180042997588072L;
 
        @Override
-       public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call(
+       public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call(
                        Tuple2<MatrixIndexes, Tuple2<Tuple2<MatrixBlock, 
MatrixBlock>, MatrixBlock>> arg)
                throws Exception 
        {
@@ -62,6 +63,6 @@ public class ExtractGroupNWeights implements 
PairFlatMapFunction<Tuple2<MatrixIn
                        groupValuePairs.add(new Tuple2<MatrixIndexes, 
WeightedCell>(ix, weightedCell));
                }
                
-               return groupValuePairs;
+               return groupValuePairs.iterator();
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java
index 329d322..8c6eb92 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.instructions.spark.functions;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
@@ -44,7 +45,7 @@ public class ReplicateVectorFunction implements 
PairFlatMapFunction<Tuple2<Matri
        }
        
        @Override
-       public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
+       public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
                throws Exception 
        {
                MatrixIndexes ix = arg0._1();
@@ -66,6 +67,6 @@ public class ReplicateVectorFunction implements 
PairFlatMapFunction<Tuple2<Matri
                                retVal.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(new MatrixIndexes(ix.getRowIndex(), i), mb));
                }
                
-               return retVal;
+               return retVal.iterator();
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
index 3bf2f67..0dd83be 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java
@@ -172,8 +172,9 @@ public class SparkListener extends 
RDDOperationGraphListener {
                
                synchronized(currentInstructions) {
                        if(stageTaskMapping.containsKey(stageID)) {
-                               Option<String> errorMessage = 
Option.apply(null); // TODO
-                               TaskUIData taskData = new 
TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), 
errorMessage);
+                               //Option<String> errorMessage = 
Option.apply(null); // TODO
+                               //TaskUIData taskData = new 
TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), 
errorMessage);
+                               TaskUIData taskData = new 
TaskUIData(taskEnd.taskInfo(), null); //TODO
                                stageTaskMapping.get(stageID).add(taskData);
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 619089b..3196f09 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -39,7 +39,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.mllib.linalg.Vector;
 import org.apache.spark.mllib.linalg.VectorUDT;
-import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
@@ -230,14 +230,14 @@ public class FrameRDDConverterUtils
        // DataFrame <--> Binary block
 
        public static JavaPairRDD<Long, FrameBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mc, boolean 
containsID) 
+                       Dataset<Row> df, MatrixCharacteristics mc, boolean 
containsID) 
                throws DMLRuntimeException 
        {
                return dataFrameToBinaryBlock(sc, df, mc, containsID, new 
Pair<String[], ValueType[]>());
        }
 
        public static JavaPairRDD<Long, FrameBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mc, boolean 
containsID, Pair<String[],ValueType[]> out) 
+                       Dataset<Row> df, MatrixCharacteristics mc, boolean 
containsID, Pair<String[],ValueType[]> out) 
                throws DMLRuntimeException 
        {
                //determine unknown dimensions if required
@@ -267,7 +267,7 @@ public class FrameRDDConverterUtils
                                new DataFrameToBinaryBlockFunction(mc, 
colnames, fschema, containsID, colVect));
        }
 
-       public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, 
JavaPairRDD<Long,FrameBlock> in, 
+       public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, 
JavaPairRDD<Long,FrameBlock> in, 
                        MatrixCharacteristics mc, ValueType[] schema)
        {
                if( !mc.colsKnown() )
@@ -460,7 +460,7 @@ public class FrameRDDConverterUtils
                private static final long serialVersionUID = 
-5789003262381127469L;
 
                @Override
-               public Iterable<Long> call(Iterator<Long> arg0) throws 
Exception 
+               public Iterator<Long> call(Iterator<Long> arg0) throws 
Exception 
                {
                        long max = 0;
                        while( max >= 0 && arg0.hasNext() ) {
@@ -470,7 +470,7 @@ public class FrameRDDConverterUtils
                        
                        ArrayList<Long> ret = new ArrayList<Long>();    
                        ret.add(max);
-                       return ret;
+                       return ret.iterator();
                }
        }
        
@@ -568,7 +568,7 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Text,Long>> arg0) 
+               public Iterator<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Text,Long>> arg0) 
                        throws Exception 
                {
                        ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
@@ -612,7 +612,7 @@ public class FrameRDDConverterUtils
                        //flush last blocks
                        flushBlocksToList(ix, fb, ret);
                
-                       return ret;
+                       return ret.iterator();
                }
                
                // Creates new state of empty column blocks for current global 
row index.
@@ -658,7 +658,7 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<String> call(Tuple2<Long, FrameBlock> arg0)
+               public Iterator<String> call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
                        Long ix = arg0._1();
@@ -705,7 +705,7 @@ public class FrameRDDConverterUtils
                                sb.setLength(0); //reset
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
        
@@ -734,7 +734,7 @@ public class FrameRDDConverterUtils
                }
                
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) 
+               public Iterator<Tuple2<Long, FrameBlock>> 
call(Iterator<Tuple2<Row, Long>> arg0) 
                        throws Exception 
                {
                        ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
@@ -776,7 +776,7 @@ public class FrameRDDConverterUtils
                        //flush last blocks
                        flushBlocksToList(ix, fb, ret);
                
-                       return ret;
+                       return ret.iterator();
                }
 
                private static void flushBlocksToList( Long ix, FrameBlock fb, 
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
@@ -792,7 +792,7 @@ public class FrameRDDConverterUtils
                private static final long serialVersionUID = 
8093340778966667460L;
                
                @Override
-               public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0)
+               public Iterator<Row> call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
                        long rowIndex = arg0._1();
@@ -810,7 +810,7 @@ public class FrameRDDConverterUtils
                                ret.add(RowFactory.create(row));
                        }
                        
-                       return ret;
+                       return ret.iterator();
                }
        }
        
@@ -855,7 +855,7 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Text> 
arg0) 
+               public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Text> 
arg0) 
                        throws Exception 
                {
                        ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
@@ -886,7 +886,7 @@ public class FrameRDDConverterUtils
                        //final flush buffer
                        flushBufferToList(rbuff, ret);
                
-                       return ret;
+                       return ret.iterator();
                }
        }
        
@@ -911,7 +911,7 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
                        throws Exception 
                {
                        ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>();
@@ -939,7 +939,7 @@ public class FrameRDDConverterUtils
                                ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(ixout,out));                             
                        }
 
-                       return ret;
+                       return ret.iterator();
                }
                
                /**
@@ -992,7 +992,7 @@ public class FrameRDDConverterUtils
                }
 
                @Override
-               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<Long, FrameBlock> arg0)
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<Long, FrameBlock> arg0)
                        throws Exception 
                {
                        long rowIndex = arg0._1();
@@ -1028,7 +1028,7 @@ public class FrameRDDConverterUtils
                                }
                        }
 
-                       return ret;
+                       return ret.iterator();
                }
        }
        

Reply via email to