Repository: flink
Updated Branches:
  refs/heads/master be055b7a9 -> 615cf42b3


[FLINK-2984] [ml] Extend libSVM file format support

This closes #1504.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/615cf42b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/615cf42b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/615cf42b

Branch: refs/heads/master
Commit: 615cf42b3d9404dca3884c5abc71650a6cf91d7b
Parents: be055b7
Author: Chiwan Park <chiwanp...@apache.org>
Authored: Wed Jan 13 17:34:11 2016 +0900
Committer: Chiwan Park <chiwanp...@apache.org>
Committed: Fri Jan 15 10:19:27 2016 +0900

----------------------------------------------------------------------
 .../scala/org/apache/flink/ml/MLUtils.scala     | 40 +++++++++++---------
 .../org/apache/flink/ml/MLUtilsSuite.scala      |  4 +-
 2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/615cf42b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
index 804ab5f..f4119f5 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.ml
 
-import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.{RichFlatMapFunction, 
RichMapFunction}
 import org.apache.flink.api.java.operators.DataSink
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml.common.LabeledVector
 import org.apache.flink.ml.math.SparseVector
+import org.apache.flink.util.Collector
 
 /** Convenience functions for machine learning tasks
   *
@@ -53,17 +54,21 @@ object MLUtils {
     *        file
     */
   def readLibSVM(env: ExecutionEnvironment, filePath: String): 
DataSet[LabeledVector] = {
-    val labelCOODS = env.readTextFile(filePath).flatMap {
-      line =>
-        // remove all comments which start with a '#'
-        val commentFreeLine = line.takeWhile(_ != '#').trim
-
-        if(commentFreeLine.nonEmpty) {
-          val splits = commentFreeLine.split(' ')
-          val label = splits.head.toDouble
-          val sparseFeatures = splits.tail
-          val coos = sparseFeatures.map {
-            str =>
+    val labelCOODS = env.readTextFile(filePath).flatMap(
+      new RichFlatMapFunction[String, (Double, Array[(Int, Double)])] {
+        val splitPattern = "\\s+".r
+
+        override def flatMap(
+          line: String,
+          out: Collector[(Double, Array[(Int, Double)])]
+        ): Unit = {
+          val commentFreeLine = line.takeWhile(_ != '#').trim
+
+          if (commentFreeLine.nonEmpty) {
+            val splits = splitPattern.split(commentFreeLine)
+            val label = splits.head.toDouble
+            val sparseFeatures = splits.tail
+            val coos = sparseFeatures.flatMap { str =>
               val pair = str.split(':')
               require(pair.length == 2, "Each feature entry has to have the 
form <feature>:<value>")
 
@@ -71,14 +76,13 @@ object MLUtils {
               val index = pair(0).toInt - 1
               val value = pair(1).toDouble
 
-              (index, value)
-          }
+              Some((index, value))
+            }
 
-          Some((label, coos))
-        } else {
-          None
+            out.collect((label, coos))
+          }
         }
-    }
+      })
 
     // Calculate maximum dimension of vectors
     val dimensionDS = labelCOODS.map {

http://git-wip-us.apache.org/repos/asf/flink/blob/615cf42b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
index d896937..f02f5ff 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/MLUtilsSuite.scala
@@ -40,9 +40,9 @@ class MLUtilsSuite extends FlatSpec with Matchers with 
FlinkTestBase {
 
     val content =
       """
-        |1 2:10.0 4:4.5 8:4.2 # foo
+        |1  2:10.0 4:4.5 8:4.2 # foo
         |-1 1:9.0 4:-4.5 7:2.4 # bar
-        |0.4 3:1.0 8:-5.6 10:1.0
+        |0.4  3:1.0 8:-5.6 10:1.0
         |-42.1 2:2.0 4:-6.1 3:5.1 # svm
       """.stripMargin
 

Reply via email to