[SYSTEMML-776] Upgrade Spark version Switched DataFrame to Dataset, switched Iterable to Iterator, and updated constructors for Spark 2.0.2.
Closes #340. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/a4c7be78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/a4c7be78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/a4c7be78 Branch: refs/heads/master Commit: a4c7be78390d01a3194e726d7a184c182bd8b558 Parents: 627fdbe Author: Glenn Weidner <gweid...@us.ibm.com> Authored: Mon Jan 16 16:54:35 2017 -0800 Committer: Glenn Weidner <gweid...@us.ibm.com> Committed: Mon Jan 16 16:54:35 2017 -0800 ---------------------------------------------------------------------- pom.xml | 6 +- src/main/java/org/apache/sysml/api/MLBlock.java | 4 +- .../java/org/apache/sysml/api/MLContext.java | 11 ++-- .../java/org/apache/sysml/api/MLMatrix.java | 15 ++--- .../java/org/apache/sysml/api/MLOutput.java | 11 ++-- .../sysml/api/mlcontext/BinaryBlockFrame.java | 9 +-- .../sysml/api/mlcontext/BinaryBlockMatrix.java | 9 +-- .../org/apache/sysml/api/mlcontext/Frame.java | 5 +- .../api/mlcontext/MLContextConversionUtil.java | 25 ++++---- .../sysml/api/mlcontext/MLContextUtil.java | 11 ++-- .../apache/sysml/api/mlcontext/MLResults.java | 19 +++--- .../org/apache/sysml/api/mlcontext/Matrix.java | 17 ++--- .../DataPartitionerRemoteSparkMapper.java | 6 +- .../parfor/RemoteDPParForSparkWorker.java | 4 +- .../parfor/RemoteParForSparkWorker.java | 6 +- .../spark/AppendGSPInstruction.java | 4 +- .../spark/CumulativeOffsetSPInstruction.java | 5 +- .../spark/FrameIndexingSPInstruction.java | 8 +-- .../instructions/spark/MapmmSPInstruction.java | 4 +- .../spark/MatrixAppendMSPInstruction.java | 4 +- .../spark/MatrixIndexingSPInstruction.java | 8 +-- .../spark/MatrixReshapeSPInstruction.java | 5 +- ...ReturnParameterizedBuiltinSPInstruction.java | 16 ++--- .../instructions/spark/PMapmmSPInstruction.java | 5 +- .../ParameterizedBuiltinSPInstruction.java | 17 ++--- .../instructions/spark/PmmSPInstruction.java | 5 +- .../spark/QuaternarySPInstruction.java | 4 +- .../instructions/spark/RandSPInstruction.java | 5 +- .../instructions/spark/ReorgSPInstruction.java | 9 +-- .../instructions/spark/RmmSPInstruction.java | 5 +- .../spark/TernarySPInstruction.java | 9 +-- .../instructions/spark/Tsmm2SPInstruction.java | 5 +- .../functions/ConvertFrameBlockToIJVLines.java | 4 +- .../functions/ConvertMatrixBlockToIJVLines.java | 6 +- .../functions/ExtractBlockForBinaryReblock.java | 5 +- .../spark/functions/ExtractGroup.java | 9 +-- .../spark/functions/ExtractGroupNWeights.java | 5 +- .../functions/ReplicateVectorFunction.java | 5 +- .../spark/functions/SparkListener.java | 5 +- .../spark/utils/FrameRDDConverterUtils.java | 40 ++++++------ .../spark/utils/RDDConverterUtils.java | 34 +++++----- .../spark/utils/RDDConverterUtilsExt.java | 14 ++--- .../instructions/spark/utils/RDDSortUtils.java | 32 +++++----- .../sysml/runtime/transform/GenTfMtdSPARK.java | 4 +- .../org/apache/sysml/api/ml/ScriptsUtils.scala | 2 +- .../functions/frame/FrameConverterTest.java | 29 ++++++--- .../DataFrameMatrixConversionTest.java | 5 +- .../DataFrameRowFrameConversionTest.java | 5 +- .../DataFrameVectorFrameConversionTest.java | 6 +- .../mlcontext/DataFrameVectorScriptTest.java | 6 +- .../functions/mlcontext/FrameTest.java | 8 +-- .../mlcontext/MLContextFrameTest.java | 26 ++++---- .../integration/mlcontext/MLContextTest.java | 66 ++++++++++---------- .../sysml/api/ml/LogisticRegressionSuite.scala | 3 +- 54 files changed, 319 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4bccc2b..8224b1d 100644 --- a/pom.xml +++ b/pom.xml @@ -65,9 +65,9 @@ <properties> <hadoop.version>2.4.1</hadoop.version> <antlr.version>4.5.3</antlr.version> - <spark.version>1.6.0</spark.version> - <scala.version>2.10.5</scala.version> - <scala.binary.version>2.10</scala.binary.version> + <spark.version>2.0.2</spark.version> + <scala.version>2.11.8</scala.version> + <scala.binary.version>2.11</scala.binary.version> <scala.test.version>2.2.6</scala.test.version> <maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss z</maven.build.timestamp.format> <enableGPU>false</enableGPU> http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLBlock.java b/src/main/java/org/apache/sysml/api/MLBlock.java index 934c4b0..d569c54 100644 --- a/src/main/java/org/apache/sysml/api/MLBlock.java +++ b/src/main/java/org/apache/sysml/api/MLBlock.java @@ -253,8 +253,8 @@ public class MLBlock implements Row { public static StructType getDefaultSchemaForBinaryBlock() { // TODO: StructField[] fields = new StructField[2]; - fields[0] = new StructField("IgnoreSchema", DataType.fromCaseClassString("DoubleType"), true, null); - fields[1] = new StructField("IgnoreSchema1", DataType.fromCaseClassString("DoubleType"), true, null); + fields[0] = new StructField("IgnoreSchema", DataType.fromJson("DoubleType"), true, null); + fields[1] = new StructField("IgnoreSchema1", DataType.fromJson("DoubleType"), true, null); return new StructType(fields); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index a4f25b8..417ce8d 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -35,7 +35,8 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.jmlc.JMLCUtils; @@ -263,7 +264,7 @@ public class MLContext { * @param df the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void registerInput(String varName, DataFrame df) throws DMLRuntimeException { + public void registerInput(String varName, Dataset<Row> df) throws DMLRuntimeException { registerInput(varName, df, false); } @@ -278,7 +279,7 @@ public class MLContext { * @param df the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void registerFrameInput(String varName, DataFrame df) throws DMLRuntimeException { + public void registerFrameInput(String varName, Dataset<Row> df) throws DMLRuntimeException { registerFrameInput(varName, df, false); } @@ -292,7 +293,7 @@ public class MLContext { * @param containsID false if the DataFrame has an column ID which denotes the row ID. * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void registerInput(String varName, DataFrame df, boolean containsID) throws DMLRuntimeException { + public void registerInput(String varName, Dataset<Row> df, boolean containsID) throws DMLRuntimeException { int blksz = ConfigurationManager.getBlocksize(); MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, blksz, blksz); JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = RDDConverterUtils @@ -309,7 +310,7 @@ public class MLContext { * @param containsID false if the DataFrame has an column ID which denotes the row ID. * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void registerFrameInput(String varName, DataFrame df, boolean containsID) throws DMLRuntimeException { + public void registerFrameInput(String varName, Dataset<Row> df, boolean containsID) throws DMLRuntimeException { int blksz = ConfigurationManager.getBlocksize(); MatrixCharacteristics mcOut = new MatrixCharacteristics(-1, -1, blksz, blksz); JavaPairRDD<Long, FrameBlock> rdd = FrameRDDConverterUtils.dataFrameToBinaryBlock(new JavaSparkContext(_sc), df, mcOut, containsID); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java index 6019c5a..2bcfb5c 100644 --- a/src/main/java/org/apache/sysml/api/MLMatrix.java +++ b/src/main/java/org/apache/sysml/api/MLMatrix.java @@ -23,10 +23,11 @@ import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SQLContext.QueryExecution; +import org.apache.spark.sql.execution.QueryExecution; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.types.StructType; import org.apache.sysml.hops.OptimizerUtils; @@ -62,26 +63,26 @@ import scala.Tuple2; result.write("Result_small.mtx", "text") */ -public class MLMatrix extends DataFrame { +public class MLMatrix extends Dataset<Row> { private static final long serialVersionUID = -7005940673916671165L; protected MatrixCharacteristics mc = null; protected MLContext ml = null; protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) { - super(sqlContext, logicalPlan); + super(sqlContext, logicalPlan, RowEncoder.apply(null)); this.ml = ml; } protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) { - super(sqlContext, queryExecution); + super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null)); this.ml = ml; } // Only used internally to set a new MLMatrix after one of matrix operations. // Not to be used externally. - protected MLMatrix(DataFrame df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException { - super(df.sqlContext(), df.logicalPlan()); + protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException { + super(df.sqlContext(), df.logicalPlan(), RowEncoder.apply(null)); this.mc = mc; this.ml = ml; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java index f760e2b..6acca68 100644 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ b/src/main/java/org/apache/sysml/api/MLOutput.java @@ -24,7 +24,8 @@ import java.util.Map; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -91,7 +92,7 @@ public class MLOutput { * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public DataFrame getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException { + public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException { if(sqlContext == null) { throw new DMLRuntimeException("SQLContext is not created."); } @@ -111,7 +112,7 @@ public class MLOutput { * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public DataFrame getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException { + public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException { if(sqlContext == null) { throw new DMLRuntimeException("SQLContext is not created."); } @@ -137,7 +138,7 @@ public class MLOutput { * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public DataFrame getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) + public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) throws DMLRuntimeException { if(sqlContext == null) @@ -173,7 +174,7 @@ public class MLOutput { } - public DataFrame getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException { + public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException { JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java index 5812cf6..e36dfb4 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockFrame.java @@ -20,7 +20,8 @@ package org.apache.sysml.api.mlcontext; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -45,7 +46,7 @@ public class BinaryBlockFrame { * @param frameMetadata * frame metadata, such as number of rows and columns */ - public BinaryBlockFrame(DataFrame dataFrame, FrameMetadata frameMetadata) { + public BinaryBlockFrame(Dataset<Row> dataFrame, FrameMetadata frameMetadata) { this.frameMetadata = frameMetadata; binaryBlocks = MLContextConversionUtil.dataFrameToFrameBinaryBlocks(dataFrame, frameMetadata); } @@ -61,7 +62,7 @@ public class BinaryBlockFrame { * @param numCols * the number of columns */ - public BinaryBlockFrame(DataFrame dataFrame, long numRows, long numCols) { + public BinaryBlockFrame(Dataset<Row> dataFrame, long numRows, long numCols) { this(dataFrame, new FrameMetadata(numRows, numCols, ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize())); } @@ -72,7 +73,7 @@ public class BinaryBlockFrame { * @param dataFrame * the Spark DataFrame */ - public BinaryBlockFrame(DataFrame dataFrame) { + public BinaryBlockFrame(Dataset<Row> dataFrame) { this(dataFrame, new FrameMetadata()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java index abbdcc0..abfad09 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/BinaryBlockMatrix.java @@ -20,7 +20,8 @@ package org.apache.sysml.api.mlcontext; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -45,7 +46,7 @@ public class BinaryBlockMatrix { * @param matrixMetadata * matrix metadata, such as number of rows and columns */ - public BinaryBlockMatrix(DataFrame dataFrame, MatrixMetadata matrixMetadata) { + public BinaryBlockMatrix(Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) { this.matrixMetadata = matrixMetadata; binaryBlocks = MLContextConversionUtil.dataFrameToMatrixBinaryBlocks(dataFrame, matrixMetadata); } @@ -61,7 +62,7 @@ public class BinaryBlockMatrix { * @param numCols * the number of columns */ - public BinaryBlockMatrix(DataFrame dataFrame, long numRows, long numCols) { + public BinaryBlockMatrix(Dataset<Row> dataFrame, long numRows, long numCols) { this(dataFrame, new MatrixMetadata(numRows, numCols, ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize())); } @@ -72,7 +73,7 @@ public class BinaryBlockMatrix { * @param dataFrame * the Spark DataFrame */ - public BinaryBlockMatrix(DataFrame dataFrame) { + public BinaryBlockMatrix(Dataset<Row> dataFrame) { this(dataFrame, new MatrixMetadata()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/Frame.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/Frame.java b/src/main/java/org/apache/sysml/api/mlcontext/Frame.java index 85dca64..6ca911d 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/Frame.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/Frame.java @@ -21,7 +21,8 @@ package org.apache.sysml.api.mlcontext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -98,7 +99,7 @@ public class Frame { * * @return the frame as a {@code DataFrame} */ - public DataFrame toDF() { + public Dataset<Row> toDF() { return MLContextConversionUtil.frameObjectToDataFrame(frameObject, sparkExecutionContext); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java index 6e3a19a..ca853ef 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java @@ -35,7 +35,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.VectorUDT; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -306,7 +307,7 @@ public class MLContextConversionUtil { * @return the {@code DataFrame} matrix converted to a converted to a * {@code MatrixObject} */ - public static MatrixObject dataFrameToMatrixObject(String variableName, DataFrame dataFrame) { + public static MatrixObject dataFrameToMatrixObject(String variableName, Dataset<Row> dataFrame) { return dataFrameToMatrixObject(variableName, dataFrame, null); } @@ -322,7 +323,7 @@ public class MLContextConversionUtil { * @return the {@code DataFrame} matrix converted to a converted to a * {@code MatrixObject} */ - public static MatrixObject dataFrameToMatrixObject(String variableName, DataFrame dataFrame, + public static MatrixObject dataFrameToMatrixObject(String variableName, Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) { matrixMetadata = (matrixMetadata!=null) ? matrixMetadata : new MatrixMetadata(); @@ -340,7 +341,7 @@ public class MLContextConversionUtil { * @return the {@code DataFrame} matrix converted to a converted to a * {@code FrameObject} */ - public static FrameObject dataFrameToFrameObject(String variableName, DataFrame dataFrame) { + public static FrameObject dataFrameToFrameObject(String variableName, Dataset<Row> dataFrame) { return dataFrameToFrameObject(variableName, dataFrame, null); } @@ -356,7 +357,7 @@ public class MLContextConversionUtil { * @return the {@code DataFrame} frame converted to a converted to a * {@code FrameObject} */ - public static FrameObject dataFrameToFrameObject(String variableName, DataFrame dataFrame, FrameMetadata frameMetadata) + public static FrameObject dataFrameToFrameObject(String variableName, Dataset<Row> dataFrame, FrameMetadata frameMetadata) { try { //setup meta data and java spark context @@ -395,7 +396,7 @@ public class MLContextConversionUtil { * {@code JavaPairRDD<MatrixIndexes, * MatrixBlock>} binary-block matrix */ - public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(DataFrame dataFrame) { + public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks(Dataset<Row> dataFrame) { return dataFrameToMatrixBinaryBlocks(dataFrame, null); } @@ -412,7 +413,7 @@ public class MLContextConversionUtil { * MatrixBlock>} binary-block matrix */ public static JavaPairRDD<MatrixIndexes, MatrixBlock> dataFrameToMatrixBinaryBlocks( - DataFrame dataFrame, MatrixMetadata matrixMetadata) + Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) { //handle meta data determineMatrixFormatIfNeeded(dataFrame, matrixMetadata); @@ -447,7 +448,7 @@ public class MLContextConversionUtil { * {@code JavaPairRDD<Long, * FrameBlock>} binary-block frame */ - public static JavaPairRDD<Long, FrameBlock> dataFrameToFrameBinaryBlocks(DataFrame dataFrame, + public static JavaPairRDD<Long, FrameBlock> dataFrameToFrameBinaryBlocks(Dataset<Row> dataFrame, FrameMetadata frameMetadata) { throw new MLContextException("dataFrameToFrameBinaryBlocks is unimplemented"); } @@ -461,7 +462,7 @@ public class MLContextConversionUtil { * @param matrixMetadata * the matrix metadata, if available */ - public static void determineMatrixFormatIfNeeded(DataFrame dataFrame, MatrixMetadata matrixMetadata) { + public static void determineMatrixFormatIfNeeded(Dataset<Row> dataFrame, MatrixMetadata matrixMetadata) { MatrixFormat matrixFormat = matrixMetadata.getMatrixFormat(); if (matrixFormat != null) { return; @@ -505,7 +506,7 @@ public class MLContextConversionUtil { * @param frameMetadata * the frame metadata, if available */ - public static void determineFrameFormatIfNeeded(DataFrame dataFrame, FrameMetadata frameMetadata) { + public static void determineFrameFormatIfNeeded(Dataset<Row> dataFrame, FrameMetadata frameMetadata) { FrameFormat frameFormat = frameMetadata.getFrameFormat(); if (frameFormat != null) { return; @@ -1229,7 +1230,7 @@ public class MLContextConversionUtil { * @param isVectorDF is the DataFrame a vector DataFrame? * @return the {@code MatrixObject} converted to a {@code DataFrame} */ - public static DataFrame matrixObjectToDataFrame(MatrixObject matrixObject, + public static Dataset<Row> matrixObjectToDataFrame(MatrixObject matrixObject, SparkExecutionContext sparkExecutionContext, boolean isVectorDF) { try { @SuppressWarnings("unchecked") @@ -1256,7 +1257,7 @@ public class MLContextConversionUtil { * the Spark execution context * @return the {@code FrameObject} converted to a {@code DataFrame} */ - public static DataFrame frameObjectToDataFrame(FrameObject frameObject, + public static Dataset<Row> frameObjectToDataFrame(FrameObject frameObject, SparkExecutionContext sparkExecutionContext) { try { @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java index b37654c..74e17ee 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java @@ -40,7 +40,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.linalg.VectorUDT; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; @@ -79,7 +80,7 @@ public final class MLContextUtil { * Complex data types supported by the MLContext API */ @SuppressWarnings("rawtypes") - public static final Class[] COMPLEX_DATA_TYPES = { JavaRDD.class, RDD.class, DataFrame.class, + public static final Class[] COMPLEX_DATA_TYPES = { JavaRDD.class, RDD.class, Dataset.class, BinaryBlockMatrix.class, BinaryBlockFrame.class, Matrix.class, Frame.class, (new double[][] {}).getClass(), MatrixBlock.class, URL.class }; @@ -491,8 +492,8 @@ public final class MLContextUtil { } else if (value instanceof FrameBlock) { FrameBlock frameBlock = (FrameBlock) value; return MLContextConversionUtil.frameBlockToFrameObject(name, frameBlock, (FrameMetadata) metadata); - } else if (value instanceof DataFrame) { - DataFrame dataFrame = (DataFrame) value; + } else if (value instanceof Dataset<?>) { + Dataset<Row> dataFrame = (Dataset<Row>) value; if (hasMatrixMetadata) { return MLContextConversionUtil.dataFrameToMatrixObject(name, dataFrame, (MatrixMetadata) metadata); @@ -578,7 +579,7 @@ public final class MLContextUtil { * @return {@code true} if the DataFrame appears to be a matrix, * {@code false} otherwise */ - public static boolean doesDataFrameLookLikeMatrix(DataFrame df) { + public static boolean doesDataFrameLookLikeMatrix(Dataset<Row> df) { StructType schema = df.schema(); StructField[] fields = schema.fields(); if (fields == null) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java index 9ceef11..17cac1f 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLResults.java @@ -23,7 +23,8 @@ import java.util.Set; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -283,7 +284,7 @@ public class MLResults { * the name of the output * @return the output as a {@code DataFrame} */ - public DataFrame getDataFrame(String outputName) { + public Dataset<Row> getDataFrame(String outputName) { if (isMatrixObject(outputName)) { MatrixObject mo = getMatrixObject(outputName); return MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, false); @@ -349,7 +350,7 @@ public class MLResults { * @return the output as a {@code DataFrame} of doubles or vectors with an * ID column */ - public DataFrame getDataFrame(String outputName, boolean isVectorDF) { + public Dataset<Row> getDataFrame(String outputName, boolean isVectorDF) { if (isFrameObject(outputName)) { throw new MLContextException("This method currently supports only matrices"); } @@ -375,7 +376,7 @@ public class MLResults { * the name of the output * @return the output as a {@code DataFrame} of doubles with an ID column */ - public DataFrame getDataFrameDoubleWithIDColumn(String outputName) { + public Dataset<Row> getDataFrameDoubleWithIDColumn(String outputName) { if (isFrameObject(outputName)) { throw new MLContextException("This method currently supports only matrices"); } @@ -401,7 +402,7 @@ public class MLResults { * the name of the output * @return the output as a {@code DataFrame} of vectors with an ID column */ - public DataFrame getDataFrameVectorWithIDColumn(String outputName) { + public Dataset<Row> getDataFrameVectorWithIDColumn(String outputName) { if (isFrameObject(outputName)) { throw new MLContextException("This method currently supports only matrices"); } @@ -427,12 +428,12 @@ public class MLResults { * the name of the output * @return the output as a {@code DataFrame} of doubles with no ID column */ - public DataFrame getDataFrameDoubleNoIDColumn(String outputName) { + public Dataset<Row> getDataFrameDoubleNoIDColumn(String outputName) { if (isFrameObject(outputName)) { throw new MLContextException("This method currently supports only matrices"); } MatrixObject mo = getMatrixObject(outputName); - DataFrame df = MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, false); + Dataset<Row> df = MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, false); return df.drop(RDDConverterUtils.DF_ID_COLUMN); } @@ -454,12 +455,12 @@ public class MLResults { * the name of the output * @return the output as a {@code DataFrame} of vectors with no ID column */ - public DataFrame getDataFrameVectorNoIDColumn(String outputName) { + public Dataset<Row> getDataFrameVectorNoIDColumn(String outputName) { if (isFrameObject(outputName)) { throw new MLContextException("This method currently supports only matrices"); } MatrixObject mo = getMatrixObject(outputName); - DataFrame df = MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, true); + Dataset<Row> df = MLContextConversionUtil.matrixObjectToDataFrame(mo, sparkExecutionContext, true); return df.drop(RDDConverterUtils.DF_ID_COLUMN); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java b/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java index aa8033d..9952350 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/Matrix.java @@ -21,7 +21,8 @@ package org.apache.sysml.api.mlcontext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; @@ -102,7 +103,7 @@ public class Matrix { * * @return the matrix as a {@code DataFrame} of doubles with an ID column */ - public DataFrame toDF() { + public Dataset<Row> toDF() { return MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, false); } @@ -111,7 +112,7 @@ public class Matrix { * * @return the matrix as a {@code DataFrame} of doubles with an ID column */ - public DataFrame toDFDoubleWithIDColumn() { + public Dataset<Row> toDFDoubleWithIDColumn() { return MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, false); } @@ -120,8 +121,8 @@ public class Matrix { * * @return the matrix as a {@code DataFrame} of doubles with no ID column */ - public DataFrame toDFDoubleNoIDColumn() { - DataFrame df = MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, false); + public Dataset<Row> toDFDoubleNoIDColumn() { + Dataset<Row> df = MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, false); return df.drop(RDDConverterUtils.DF_ID_COLUMN); } @@ -130,7 +131,7 @@ public class Matrix { * * @return the matrix as a {@code DataFrame} of vectors with an ID column */ - public DataFrame toDFVectorWithIDColumn() { + public Dataset<Row> toDFVectorWithIDColumn() { return MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, true); } @@ -139,8 +140,8 @@ public class Matrix { * * @return the matrix as a {@code DataFrame} of vectors with no ID column */ - public DataFrame toDFVectorNoIDColumn() { - DataFrame df = MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, true); + public Dataset<Row> toDFVectorNoIDColumn() { + Dataset<Row> df = MLContextConversionUtil.matrixObjectToDataFrame(matrixObject, sparkExecutionContext, true); return df.drop(RDDConverterUtils.DF_ID_COLUMN); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java index 8f33081..9e690d5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkMapper.java @@ -20,12 +20,12 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import org.apache.hadoop.io.Writable; import org.apache.spark.api.java.function.PairFlatMapFunction; - import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; @@ -73,7 +73,7 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF @Override - public Iterable<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<Long, Writable>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { List<Tuple2<Long, Writable>> ret = new LinkedList<Tuple2<Long, Writable>>(); @@ -148,7 +148,7 @@ public class DataPartitionerRemoteSparkMapper extends ParWorker implements PairF throw new DMLRuntimeException("Unsupported partition format: "+_dpf); } - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 2a0dc7d..c973115 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -94,7 +94,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF } @Override - public Iterable<Tuple2<Long, String>> call(Iterator<Tuple2<Long, Iterable<Writable>>> arg0) + public Iterator<Tuple2<Long, String>> call(Iterator<Tuple2<Long, Iterable<Writable>>> arg0) throws Exception { ArrayList<Tuple2<Long,String>> ret = new ArrayList<Tuple2<Long,String>>(); @@ -137,7 +137,7 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF ret.add(new Tuple2<Long,String>(_workerID, val)); } - return ret; + return ret.iterator(); } private void configureWorker( long ID ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index 4dc5126..75e5137 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -21,11 +21,11 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.Accumulator; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; - import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler; @@ -60,7 +60,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun } @Override - public Iterable<Tuple2<Long, String>> call(Task arg0) + public Iterator<Tuple2<Long, String>> call(Task arg0) throws Exception { //lazy parworker initialization @@ -82,7 +82,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun for( String val : tmp ) ret.add(new Tuple2<Long,String>(_workerID, val)); - return ret; + return ret.iterator(); } private void configureWorker( long ID ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java index 3d16391..092af3b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java @@ -176,7 +176,7 @@ public class AppendGSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { //common preparation @@ -240,7 +240,7 @@ public class AppendGSPInstruction extends BinarySPInstruction } } - return retVal; + return retVal.iterator(); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 5a26167..ab0c3ed 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; @@ -134,7 +135,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); @@ -167,7 +168,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(tmpix, tmpblk)); } - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java index 6e25446..e19b617 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/FrameIndexingSPInstruction.java @@ -208,13 +208,13 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> rightKV) + public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> rightKV) throws Exception { Pair<Long,FrameBlock> in = SparkUtils.toIndexedFrameBlock(rightKV); ArrayList<Pair<Long,FrameBlock>> out = new ArrayList<Pair<Long,FrameBlock>>(); OperationsOnMatrixValues.performShift(in, _ixrange, _brlen, _bclen, _rlen, _clen, out); - return SparkUtils.fromIndexedFrameBlock(out); + return SparkUtils.fromIndexedFrameBlock(out).iterator(); } } @@ -237,7 +237,7 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> kv) + public Iterator<Tuple2<Long, FrameBlock>> call(Tuple2<Long, FrameBlock> kv) throws Exception { ArrayList<Pair<Long,FrameBlock>> out = new ArrayList<Pair<Long,FrameBlock>>(); @@ -269,7 +269,7 @@ public class FrameIndexingSPInstruction extends IndexingSPInstruction curBlockRange.rowStart = lGblStartRow + _brlen; iRowStartDest = UtilFunctions.computeCellInBlock(iRowStartDest+iMaxRowsToCopy+1, _brlen); } - return SparkUtils.fromIndexedFrameBlock(out); + return SparkUtils.fromIndexedFrameBlock(out).iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java index dfe8791..310664f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java @@ -414,7 +414,7 @@ public class MapmmSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); @@ -458,7 +458,7 @@ public class MapmmSPInstruction extends BinarySPInstruction } } - return ret; + return ret.iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java index 7100011..0765873 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixAppendMSPInstruction.java @@ -121,7 +121,7 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); @@ -183,7 +183,7 @@ public class MatrixAppendMSPInstruction extends AppendMSPInstruction ret.addAll(SparkUtils.fromIndexedMatrixBlock(outlist)); } - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java index 8d4d296..5b1012f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java @@ -297,13 +297,13 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> rightKV) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> rightKV) throws Exception { IndexedMatrixValue in = SparkUtils.toIndexedMatrixBlock(rightKV); ArrayList<IndexedMatrixValue> out = new ArrayList<IndexedMatrixValue>(); OperationsOnMatrixValues.performShift(in, _ixrange, _brlen, _bclen, _rlen, _clen, out); - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } @@ -420,13 +420,13 @@ public class MatrixIndexingSPInstruction extends IndexingSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { IndexedMatrixValue in = SparkUtils.toIndexedMatrixBlock(kv); ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>(); OperationsOnMatrixValues.performSlice(in, _ixrange, _brlen, _bclen, outlist); - return SparkUtils.fromIndexedMatrixBlock(outlist); + return SparkUtils.fromIndexedMatrixBlock(outlist).iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java index 6b27240..5941285 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -126,7 +127,7 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { //input conversion (for libmatrixreorg compatibility) @@ -138,7 +139,7 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction out, _mcOut.getRows(), _mcOut.getCols(), _mcOut.getRowsPerBlock(), _mcOut.getColsPerBlock(), _byrow); //output conversion (for compatibility w/ rdd schema) - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java index 319d833..daa1ce5 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java @@ -210,7 +210,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } @Override - public Iterable<Tuple2<Integer, Object>> call(Iterator<Tuple2<Long, FrameBlock>> iter) + public Iterator<Tuple2<Integer, Object>> call(Iterator<Tuple2<Long, FrameBlock>> iter) throws Exception { //build meta data (e.g., recode maps) @@ -226,7 +226,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI ret.add(new Tuple2<Integer,Object>(e1.getKey(), token)); _raEncoder.getCPRecodeMapsPartial().clear(); - return ret; + return ret.iterator(); } } @@ -249,7 +249,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } @Override - public Iterable<String> call(Tuple2<Integer, Iterable<Object>> arg0) + public Iterator<String> call(Tuple2<Integer, Iterable<Object>> arg0) throws Exception { String colID = String.valueOf(arg0._1()); @@ -271,7 +271,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } _accMax.add(rowID-1); - return ret; + return ret.iterator(); } } @@ -306,7 +306,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } @Override - public Iterable<Tuple2<Integer, ColumnMetadata>> call(Iterator<Tuple2<Long, FrameBlock>> iter) + public Iterator<Tuple2<Integer, ColumnMetadata>> call(Iterator<Tuple2<Long, FrameBlock>> iter) throws Exception { //build meta data (e.g., histograms and means) @@ -335,7 +335,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } } - return ret; + return ret.iterator(); } } @@ -350,7 +350,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI } @Override - public Iterable<String> call(Tuple2<Integer, Iterable<ColumnMetadata>> arg0) + public Iterator<String> call(Tuple2<Integer, Iterable<ColumnMetadata>> arg0) throws Exception { int colix = arg0._1(); @@ -392,7 +392,7 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI ret.add("-2 " + colix + " " + iter.next().getMvValue()); } - return ret; + return ret.iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java index 8aaa8b6..5efba81 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -178,7 +179,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { PartitionedBlock<MatrixBlock> pm = _pbc.value(); @@ -204,7 +205,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixOut, blkOut)); } - return ret; + return ret.iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java index 06decc0..d7f4f7d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -543,7 +544,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg0) throws Exception { //prepare inputs (for internal api compatibility) @@ -555,7 +556,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction LibMatrixReorg.rmempty(data, offsets, _rmRows, _len, _brlen, _bclen, out); //prepare and return outputs - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } @@ -580,7 +581,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { //prepare inputs (for internal api compatibility) @@ -597,7 +598,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction LibMatrixReorg.rmempty(data, offsets, _rmRows, _len, _brlen, _bclen, out); //prepare and return outputs - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } @@ -623,7 +624,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { //prepare inputs (for internal api compatibility) @@ -634,7 +635,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction LibMatrixReorg.rexpand(data, _maxVal, _dirRows, _cast, _ignore, _brlen, _bclen, out); //prepare and return outputs - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } @@ -658,7 +659,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { //get all inputs @@ -672,7 +673,7 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction OperationsOnMatrixValues.performMapGroupedAggregate(_op, in1, groups, _ngroups, _brlen, _bclen, outlist); //output all result blocks - return SparkUtils.fromIndexedMatrixBlock(outlist); + return SparkUtils.fromIndexedMatrixBlock(outlist).iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java index c3d2c27..ce87c46 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -130,7 +131,7 @@ public class PmmSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); @@ -170,7 +171,7 @@ public class PmmSPInstruction extends BinarySPInstruction ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(rowIX2, ixIn.getColumnIndex()), out2)); } - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java index 063b6e7..0ae8a67 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java @@ -548,7 +548,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { LinkedList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new LinkedList<Tuple2<MatrixIndexes, MatrixBlock>>(); @@ -579,7 +579,7 @@ public class QuaternarySPInstruction extends ComputationSPInstruction } //output list of new tuples - return ret; + return ret.iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index 1f66825..8df8b26 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.Random; import org.apache.commons.math3.distribution.PoissonDistribution; @@ -626,7 +627,7 @@ public class RandSPInstruction extends UnarySPInstruction } @Override - public Iterable<Double> call(SampleTask t) + public Iterator<Double> call(SampleTask t) throws Exception { long st = t.range_start; @@ -660,7 +661,7 @@ public class RandSPInstruction extends UnarySPInstruction retList.add((double) i); } } - return retList; + return retList.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java index fb4be29..62ffbce 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; @@ -239,7 +240,7 @@ public class ReorgSPInstruction extends UnarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); @@ -264,7 +265,7 @@ public class ReorgSPInstruction extends UnarySPInstruction } } - return ret; + return ret.iterator(); } } @@ -281,7 +282,7 @@ public class ReorgSPInstruction extends UnarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { //construct input @@ -292,7 +293,7 @@ public class ReorgSPInstruction extends UnarySPInstruction LibMatrixReorg.rev(in, _mcIn.getRows(), _mcIn.getRowsPerBlock(), out); //construct output - return SparkUtils.fromIndexedMatrixBlock(out); + return SparkUtils.fromIndexedMatrixBlock(out).iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java index 0e01bb4..e6b5755 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark; +import java.util.Iterator; import java.util.LinkedList; import org.apache.spark.api.java.JavaPairRDD; @@ -119,7 +120,7 @@ public class RmmSPInstruction extends BinarySPInstruction } @Override - public Iterable<Tuple2<TripleIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) + public Iterator<Tuple2<TripleIndexes, MatrixBlock>> call( Tuple2<MatrixIndexes, MatrixBlock> arg0 ) throws Exception { LinkedList<Tuple2<TripleIndexes, MatrixBlock>> ret = new LinkedList<Tuple2<TripleIndexes, MatrixBlock>>(); @@ -152,7 +153,7 @@ public class RmmSPInstruction extends BinarySPInstruction } //output list of new tuples - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java index 8b0eb42..a25dcf0 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/TernarySPInstruction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; @@ -258,7 +259,7 @@ public class TernarySPInstruction extends ComputationSPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, Double>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, Double>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { MatrixIndexes ix = arg0._1(); @@ -279,7 +280,7 @@ public class TernarySPInstruction extends ComputationSPInstruction retVal.add(new Tuple2<MatrixIndexes,Double>(p.getKey(), p.getValue())); } - return retVal; + return retVal.iterator(); } } @@ -460,7 +461,7 @@ public class TernarySPInstruction extends ComputationSPInstruction @SuppressWarnings("deprecation") @Override - public Iterable<Tuple2<MatrixIndexes, Double>> call(CTableMap ctableMap) + public Iterator<Tuple2<MatrixIndexes, Double>> call(CTableMap ctableMap) throws Exception { ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new ArrayList<Tuple2<MatrixIndexes, Double>>(); @@ -472,7 +473,7 @@ public class TernarySPInstruction extends ComputationSPInstruction // retVal.add(new Tuple2<MatrixIndexes, MatrixCell>(blockIndexes, cell)); retVal.add(new Tuple2<MatrixIndexes, Double>(new MatrixIndexes(i, j), v)); } - return retVal; + return retVal.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java index 8fe9a7d..da917b5 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; @@ -145,7 +146,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { List<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); @@ -174,7 +175,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(ixout3, out3)); } - return ret; + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java index 792e0b6..cd6e58d 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java @@ -32,7 +32,7 @@ public class ConvertFrameBlockToIJVLines implements FlatMapFunction<Tuple2<Long, private static final long serialVersionUID = 1803516615963340115L; @Override - public Iterable<String> call(Tuple2<Long, FrameBlock> kv) + public Iterator<String> call(Tuple2<Long, FrameBlock> kv) throws Exception { long rowoffset = kv._1; @@ -68,6 +68,6 @@ public class ConvertFrameBlockToIJVLines implements FlatMapFunction<Tuple2<Long, } } - return cells; + return cells.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java index 788615a..621fb06 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertMatrixBlockToIJVLines.java @@ -39,12 +39,12 @@ public class ConvertMatrixBlockToIJVLines implements FlatMapFunction<Tuple2<Matr } @Override - public Iterable<String> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { + public Iterator<String> call(Tuple2<MatrixIndexes, MatrixBlock> kv) throws Exception { final BinaryBlockToTextCellConverter converter = new BinaryBlockToTextCellConverter(); converter.setBlockSize(brlen, bclen); converter.convert(kv._1, kv._2); - return new Iterable<String>() { + Iterable<String> ret = new Iterable<String>() { @Override public Iterator<String> iterator() { return new Iterator<String>() { @@ -64,6 +64,8 @@ public class ConvertMatrixBlockToIJVLines implements FlatMapFunction<Tuple2<Matr }; } }; + + return ret.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java index cc361c6..61b4385 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark.functions; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -60,7 +61,7 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2< } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { MatrixIndexes ixIn = arg0._1(); @@ -107,7 +108,7 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2< retVal.add(new Tuple2<MatrixIndexes, MatrixBlock>(indx, blk)); } } - return retVal; + return retVal.iterator(); } private long getEndGlobalIndex(long blockIndex, boolean isIn, boolean isRow) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java index 1ab8ecd..56c1c46 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroup.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.instructions.spark.functions; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -109,7 +110,7 @@ public abstract class ExtractGroup implements Serializable } @Override - public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call( + public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call( Tuple2<MatrixIndexes, Tuple2<MatrixBlock, MatrixBlock>> arg) throws Exception { @@ -117,7 +118,7 @@ public abstract class ExtractGroup implements Serializable MatrixBlock group = arg._2._1; MatrixBlock target = arg._2._2; - return execute(ix, group, target); + return execute(ix, group, target).iterator(); } } @@ -133,7 +134,7 @@ public abstract class ExtractGroup implements Serializable } @Override - public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call( + public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call( Tuple2<MatrixIndexes, MatrixBlock> arg) throws Exception { @@ -141,7 +142,7 @@ public abstract class ExtractGroup implements Serializable MatrixBlock group = _pbm.getBlock((int)ix.getRowIndex(), 1); MatrixBlock target = arg._2; - return execute(ix, group, target); + return execute(ix, group, target).iterator(); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java index 372ebea..d040dbc 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ExtractGroupNWeights.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark.functions; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -35,7 +36,7 @@ public class ExtractGroupNWeights implements PairFlatMapFunction<Tuple2<MatrixIn private static final long serialVersionUID = -188180042997588072L; @Override - public Iterable<Tuple2<MatrixIndexes, WeightedCell>> call( + public Iterator<Tuple2<MatrixIndexes, WeightedCell>> call( Tuple2<MatrixIndexes, Tuple2<Tuple2<MatrixBlock, MatrixBlock>, MatrixBlock>> arg) throws Exception { @@ -62,6 +63,6 @@ public class ExtractGroupNWeights implements PairFlatMapFunction<Tuple2<MatrixIn groupValuePairs.add(new Tuple2<MatrixIndexes, WeightedCell>(ix, weightedCell)); } - return groupValuePairs; + return groupValuePairs.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java index 329d322..8c6eb92 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ReplicateVectorFunction.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.instructions.spark.functions; import java.util.ArrayList; +import java.util.Iterator; import org.apache.spark.api.java.function.PairFlatMapFunction; @@ -44,7 +45,7 @@ public class ReplicateVectorFunction implements PairFlatMapFunction<Tuple2<Matri } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0) throws Exception { MatrixIndexes ix = arg0._1(); @@ -66,6 +67,6 @@ public class ReplicateVectorFunction implements PairFlatMapFunction<Tuple2<Matri retVal.add(new Tuple2<MatrixIndexes, MatrixBlock>(new MatrixIndexes(ix.getRowIndex(), i), mb)); } - return retVal; + return retVal.iterator(); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java index 3bf2f67..0dd83be 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/SparkListener.java @@ -172,8 +172,9 @@ public class SparkListener extends RDDOperationGraphListener { synchronized(currentInstructions) { if(stageTaskMapping.containsKey(stageID)) { - Option<String> errorMessage = Option.apply(null); // TODO - TaskUIData taskData = new TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), errorMessage); + //Option<String> errorMessage = Option.apply(null); // TODO + //TaskUIData taskData = new TaskUIData(taskEnd.taskInfo(), Option.apply(taskEnd.taskMetrics()), errorMessage); + TaskUIData taskData = new TaskUIData(taskEnd.taskInfo(), null); //TODO stageTaskMapping.get(stageID).add(taskData); } else { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/a4c7be78/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 619089b..3196f09 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -39,7 +39,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.VectorUDT; -import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; @@ -230,14 +230,14 @@ public class FrameRDDConverterUtils // DataFrame <--> Binary block public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc, - DataFrame df, MatrixCharacteristics mc, boolean containsID) + Dataset<Row> df, MatrixCharacteristics mc, boolean containsID) throws DMLRuntimeException { return dataFrameToBinaryBlock(sc, df, mc, containsID, new Pair<String[], ValueType[]>()); } public static JavaPairRDD<Long, FrameBlock> dataFrameToBinaryBlock(JavaSparkContext sc, - DataFrame df, MatrixCharacteristics mc, boolean containsID, Pair<String[],ValueType[]> out) + Dataset<Row> df, MatrixCharacteristics mc, boolean containsID, Pair<String[],ValueType[]> out) throws DMLRuntimeException { //determine unknown dimensions if required @@ -267,7 +267,7 @@ public class FrameRDDConverterUtils new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID, colVect)); } - public static DataFrame binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in, + public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mc, ValueType[] schema) { if( !mc.colsKnown() ) @@ -460,7 +460,7 @@ public class FrameRDDConverterUtils private static final long serialVersionUID = -5789003262381127469L; @Override - public Iterable<Long> call(Iterator<Long> arg0) throws Exception + public Iterator<Long> call(Iterator<Long> arg0) throws Exception { long max = 0; while( max >= 0 && arg0.hasNext() ) { @@ -470,7 +470,7 @@ public class FrameRDDConverterUtils ArrayList<Long> ret = new ArrayList<Long>(); ret.add(max); - return ret; + return ret.iterator(); } } @@ -568,7 +568,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) + public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Text,Long>> arg0) throws Exception { ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); @@ -612,7 +612,7 @@ public class FrameRDDConverterUtils //flush last blocks flushBlocksToList(ix, fb, ret); - return ret; + return ret.iterator(); } // Creates new state of empty column blocks for current global row index. @@ -658,7 +658,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<String> call(Tuple2<Long, FrameBlock> arg0) + public Iterator<String> call(Tuple2<Long, FrameBlock> arg0) throws Exception { Long ix = arg0._1(); @@ -705,7 +705,7 @@ public class FrameRDDConverterUtils sb.setLength(0); //reset } - return ret; + return ret.iterator(); } } @@ -734,7 +734,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) + public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Tuple2<Row, Long>> arg0) throws Exception { ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); @@ -776,7 +776,7 @@ public class FrameRDDConverterUtils //flush last blocks flushBlocksToList(ix, fb, ret); - return ret; + return ret.iterator(); } private static void flushBlocksToList( Long ix, FrameBlock fb, ArrayList<Tuple2<Long,FrameBlock>> ret ) @@ -792,7 +792,7 @@ public class FrameRDDConverterUtils private static final long serialVersionUID = 8093340778966667460L; @Override - public Iterable<Row> call(Tuple2<Long, FrameBlock> arg0) + public Iterator<Row> call(Tuple2<Long, FrameBlock> arg0) throws Exception { long rowIndex = arg0._1(); @@ -810,7 +810,7 @@ public class FrameRDDConverterUtils ret.add(RowFactory.create(row)); } - return ret; + return ret.iterator(); } } @@ -855,7 +855,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Text> arg0) + public Iterator<Tuple2<Long, FrameBlock>> call(Iterator<Text> arg0) throws Exception { ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); @@ -886,7 +886,7 @@ public class FrameRDDConverterUtils //final flush buffer flushBufferToList(rbuff, ret); - return ret; + return ret.iterator(); } } @@ -911,7 +911,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes,MatrixBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes,MatrixBlock> arg0) throws Exception { ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); @@ -939,7 +939,7 @@ public class FrameRDDConverterUtils ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(ixout,out)); } - return ret; + return ret.iterator(); } /** @@ -992,7 +992,7 @@ public class FrameRDDConverterUtils } @Override - public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<Long, FrameBlock> arg0) + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<Long, FrameBlock> arg0) throws Exception { long rowIndex = arg0._1(); @@ -1028,7 +1028,7 @@ public class FrameRDDConverterUtils } } - return ret; + return ret.iterator(); } }