Repository: incubator-systemml
Updated Branches:
  refs/heads/master 35319ba70 -> 94fd4d741


Adding converter utils to extract specific columns from DataFrame

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/94fd4d74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/94fd4d74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/94fd4d74

Branch: refs/heads/master
Commit: 94fd4d7413490c66174336e00355ef12f7b56f47
Parents: 35319ba
Author: Niketan Pansare <npan...@us.ibm.com>
Authored: Wed Dec 9 15:06:01 2015 -0800
Committer: Niketan Pansare <npan...@us.ibm.com>
Committed: Wed Dec 9 15:06:01 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    |  3 ---
 .../spark/utils/RDDConverterUtilsExt.java       | 28 +++++++++++++++++---
 2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/94fd4d74/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index bcfc976..4b9ad8d 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -248,9 +248,6 @@ public class MLContext {
        
        /**
         * Register DataFrame as input. 
-        * Current version doesnot support containsID=true.
-        * Note: for Spark 1.4.0 or higher, registerInput(varName, 
df.sort("ID").drop("ID"), true) = registerInput(varName, df, false)
-        * <p>
         * Marks the variable in the DML script as input variable.
         * Note that this expects a "varName = read(...)" statement in the DML 
script which through non-MLContext invocation
         * would have been created by reading a HDFS file.  

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/94fd4d74/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 3a227a9..af4885a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -206,13 +206,35 @@ public class RDDConverterUtilsExt
                        throw new DMLRuntimeException("No column other than \"" 
+ column + "\" present in the dataframe.");
                }
                
-               // Round about way to do in Java (not exposed in Spark 1.3.0): 
df = df.sort("ID").drop("ID");
+               // Round about way to do in Java (not exposed in Spark 1.3.0): 
df = df.drop("ID");
                return df.select(firstCol, 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
        }
        
+       public static DataFrame projectColumns(DataFrame df, ArrayList<String> 
columns) throws DMLRuntimeException {
+               ArrayList<String> columnToSelect = new ArrayList<String>();
+               for(int i = 1; i < columns.size(); i++) {
+                       columnToSelect.add(columns.get(i));
+               }
+               return df.select(columns.get(0), 
scala.collection.JavaConversions.asScalaBuffer(columnToSelect).toList());
+       }
+       
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
+                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) throws DMLRuntimeException {
+               return dataFrameToBinaryBlock(sc, df, mcOut, containsID, null);
+       }
+       
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
+                       DataFrame df, MatrixCharacteristics mcOut, 
ArrayList<String> columns) throws DMLRuntimeException {
+               return dataFrameToBinaryBlock(sc, df, mcOut, false, columns);
+       }
+       
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
dataFrameToBinaryBlock(JavaSparkContext sc,
-                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID) 
+                       DataFrame df, MatrixCharacteristics mcOut, boolean 
containsID, ArrayList<String> columns) 
                        throws DMLRuntimeException {
+               if(columns != null) {
+                       df = projectColumns(df, columns);
+               }
+               
                if(containsID) {
                        df = dropColumn(df.sort("ID"), "ID");
                }
@@ -276,7 +298,7 @@ public class RDDConverterUtilsExt
                        for(int i = 0; i < oldNumCols; i++) {
                                fields[i] = arg0._1.get(i);
                        }
-                       fields[oldNumCols] = new Double(arg0._2);
+                       fields[oldNumCols] = new Double(arg0._2 + 1);
                        return RowFactory.create(fields);
                }
                

Reply via email to