Repository: flink
Updated Branches:
  refs/heads/master 6d8b3f5a1 -> c7415192e


[FLINK-5423] [ml] Implement Stochastic Outlier Selection (SOS) algorithm

Implemented the Stochastic Outlier Selection algorithm in the Machine Learning 
library, including the test code and scaladoc documentation. Furthermore 
extended the development documentation.

This closes #3077.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7415192
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7415192
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7415192

Branch: refs/heads/master
Commit: c7415192e06574e35bf2ce630fe70338be36af2b
Parents: 6d8b3f5
Author: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Authored: Fri Dec 30 07:38:52 2016 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Jan 19 14:48:50 2017 +0100

----------------------------------------------------------------------
 docs/dev/libs/ml/index.md                       |   4 +
 docs/dev/libs/ml/sos.md                         | 120 ++++++
 .../ml/outlier/StochasticOutlierSelection.scala | 383 +++++++++++++++++++
 .../StochasticOutlierSelectionITSuite.scala     | 236 ++++++++++++
 4 files changed, 743 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7415192/docs/dev/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/ml/index.md b/docs/dev/libs/ml/index.md
index dcd3e0a..129be32 100644
--- a/docs/dev/libs/ml/index.md
+++ b/docs/dev/libs/ml/index.md
@@ -58,6 +58,10 @@ FlinkML currently supports the following algorithms:
 
 * [Alternating Least Squares (ALS)](als.html)
 
+### Outlier selection
+
+* [Stochastic Outlier Selection (SOS)](sos.html)
+
 ### Utilities
 
 * [Distance Metrics](distance_metrics.html)

http://git-wip-us.apache.org/repos/asf/flink/blob/c7415192/docs/dev/libs/ml/sos.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/ml/sos.md b/docs/dev/libs/ml/sos.md
new file mode 100644
index 0000000..22f4c30
--- /dev/null
+++ b/docs/dev/libs/ml/sos.md
@@ -0,0 +1,120 @@
+---
+mathjax: include
+title: Stochastic Outlier Selection
+nav-parent_id: ml
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+
+## Description
+
+An outlier is one or multiple observations that deviates quantitatively from 
the majority of the data set and may be the subject of further investigation.
+Stochastic Outlier Selection (SOS) developed by Jeroen 
Janssens[[1]](#janssens) is an unsupervised outlier-selection algorithm that 
takes as input a set of 
+vectors. The algorithm applies affinity-based outlier selection and outputs 
for each data point an outlier probability. 
+Intuitively, a data point is considered to be an outlier when the other data 
points have insufficient affinity with it.
+
+Outlier detection has its application in a number of field, for example, log 
analysis, fraud detection, noise removal, novelty detection, quality control,
+ sensor monitoring, etc. If a sensor turns faulty, it is likely that it will 
output values that deviate markedly from the majority.
+ 
+For more information, please consult the [PhD Thesis of Jeroens 
Janssens](https://github.com/jeroenjanssens/phd-thesis) on 
+Outlier Selection and One-Class Classification which introduces the algorithm. 
                                                                       
+
+## Parameters
+
+The stochastic outlier selection algorithm implementation can be controlled by 
the following parameters:
+
+   <table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Parameters</th>
+        <th class="text-center">Description</th>
+      </tr>
+    </thead>
+
+    <tbody>
+      <tr>
+        <td><strong>Perplexity</strong></td>
+        <td>
+          <p>
+            Perplexity can be interpreted as the k in k-nearest neighbor 
algorithms. The difference with SOS being a neighbor
+            is not a binary property, but a probabilistic one, and therefore 
it a real number. Must be between 1 and n-1, 
+            where n is the number of points. A good starting point can be 
obtained by using the square root of the number of observations. 
+            (Default value: <strong>30</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>ErrorTolerance</strong></td>
+        <td>
+          <p>
+            The accepted error tolerance to reduce computational time when 
approximating the affinity. It will 
+            sacrifice accuracy in return for reduced computational time.
+            (Default value: <strong>1e-20</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>MaxIterations</strong></td>
+        <td>
+          <p>
+            The maximum number of iterations to approximate the affinity of 
the algorithm.
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+
+
+## Example
+
+{% highlight scala %}
+val data = env.fromCollection(List(
+  LabeledVector(0.0, DenseVector(1.0, 1.0)),
+  LabeledVector(1.0, DenseVector(2.0, 1.0)),
+  LabeledVector(2.0, DenseVector(1.0, 2.0)),
+  LabeledVector(3.0, DenseVector(2.0, 2.0)),
+  LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
+))
+
+val sos = new StochasticOutlierSelection().setPerplexity(3)
+
+val outputVector = sos
+  .transform(data)
+  .collect()
+
+val expectedOutputVector = Map(
+  0 -> 0.2790094479202896,
+  1 -> 0.25775014551682535,
+  2 -> 0.22136130977995766,
+  3 -> 0.12707053787018444,
+  4 -> 0.9922779902453757 // The outlier!
+)
+
+outputVector.foreach(output => expectedOutputVector(output._1) should 
be(output._2))
+{% endhighlight %}
+
+**References**
+
+<a name="janssens"></a>[1]J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. 
van den Herik. 
+*Stochastic Outlier Selection*. Technical Report TiCC TR 2012-001, Tilburg 
University, Tilburg, the Netherlands, 2012.

http://git-wip-us.apache.org/repos/asf/flink/blob/c7415192/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
new file mode 100644
index 0000000..2c04bb0
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.outlier
+
+/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen 
Jansen
+  *
+  * For more information about SOS, see https://github.com/jeroenjanssens/sos
+  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
+  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
+  * Tilburg, the Netherlands, 2012.
+  *
+  * @example
+  * {{{
+  *   val data = env.fromCollection(List(
+  *     LabeledVector(0.0, DenseVector(1.0, 1.0)),
+  *     LabeledVector(1.0, DenseVector(2.0, 1.0)),
+  *     LabeledVector(2.0, DenseVector(1.0, 2.0)),
+  *     LabeledVector(3.0, DenseVector(2.0, 2.0)),
+  *     LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
+  *   ))
+  *
+  *   val sos = new StochasticOutlierSelection().setPerplexity(3)
+  *
+  *   val outputVector = sos
+  *     .transform(data)
+  *     .collect()
+  *
+  *   val expectedOutputVector = Map(
+  *     0 -> 0.2790094479202896,
+  *     1 -> 0.25775014551682535,
+  *     2 -> 0.22136130977995766,
+  *     3 -> 0.12707053787018444,
+  *     4 -> 0.9922779902453757 // The outlier!
+  *   )
+  *
+  *   outputVector.foreach(output => expectedOutputVector(output._1) should 
be(output._2))
+  * }}}
+  *
+  * =Parameters=
+  *
+  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
+  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. 
The difference is
+  *  in SOS being a neighbor is not a binary property, but a probabilistic 
one, and therefore it
+  *  a real number. Must be between 1 and n-1, where n is the number of points.
+  *  (Default value: '''30''')
+  *
+  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
+  *  The accepted error tolerance when computing the perplexity. When 
increasing this number, it
+  *  will sacrifice accuracy in return for reduced computational time.
+  *  (Default value: '''1e-20''')
+  *
+  *  - 
[[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
+  *  The maximum number of iterations to perform to constrain the 
computational time.
+  *  (Default value: '''5000''')
+  */
+
+import breeze.linalg.functions.euclideanDistance
+import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => 
BreezeVector}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, 
WithParameters}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+class StochasticOutlierSelection extends 
Transformer[StochasticOutlierSelection] {
+
+  import StochasticOutlierSelection._
+
+
+  /** Sets the perplexity of the outlier selection algorithm, can be seen as 
the k of kNN
+    * For more information, please read the Stochastic Outlier Selection 
algorithm technical paper.
+    *
+    * @param perplexity the perplexity of the affinity fit
+    * @return
+    */
+  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
+    require(perplexity >= 1, "Perplexity must be at least one.")
+    parameters.add(Perplexity, perplexity)
+    this
+  }
+
+  /** The accepted error tolerance to reduce computational time when 
approximating the affinity.
+    *
+    * @param errorTolerance the accepted error tolerance with respect to the 
affinity
+    * @return
+    */
+  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
+    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
+    parameters.add(ErrorTolerance, errorTolerance)
+    this
+  }
+
+  /** The maximum number of iterations to approximate the affinity of the 
algorithm.
+    *
+    * @param maxIterations the maximum number of iterations.
+    * @return
+    */
+  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
+    require(maxIterations > 0, "Maximum iterations must be positive.")
+    parameters.add(MaxIterations, maxIterations)
+    this
+  }
+
+}
+
+object StochasticOutlierSelection extends WithParameters {
+
+  // ========================================= Parameters 
==========================================
+  case object Perplexity extends Parameter[Double] {
+    val defaultValue: Option[Double] = Some(30)
+  }
+
+  case object ErrorTolerance extends Parameter[Double] {
+    val defaultValue: Option[Double] = Some(1e-20)
+  }
+
+  case object MaxIterations extends Parameter[Int] {
+    val defaultValue: Option[Int] = Some(5000)
+  }
+
+  // ==================================== Factory methods 
==========================================
+
+  def apply(): StochasticOutlierSelection = {
+    new StochasticOutlierSelection()
+  }
+
+  // ===================================== Operations 
==============================================
+  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
+
+  implicit val transformLabeledVectors = {
+
+    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, 
(Int, Double)] {
+
+
+      /** Overrides the method of the parent class and applies the sochastic 
outlier selection
+        * algorithm.
+        *
+        * @param instance Instance of the class
+        * @param transformParameters The user defined parameters of the 
algorithm
+        * @param input A data set which consists of all the LabeledVectors, 
which should have an
+        *              index or unique integer label as vector.
+        * @return The outlierness of the vectors compared to each other
+        */
+      override def transformDataSet(instance: StochasticOutlierSelection,
+                                    transformParameters: ParameterMap,
+                                    input: DataSet[LabeledVector]): 
DataSet[(Int, Double)] = {
+
+        val resultingParameters = instance.parameters ++ transformParameters
+
+        val vectorsWithIndex = input.map(labeledVector => {
+          BreezeLabeledVector(labeledVector.label.toInt, 
labeledVector.vector.asBreeze)
+        })
+
+        // Don't map back to a labeled-vector since the output of the 
algorithm is
+        // a single double instead of vector
+        outlierSelection(vectorsWithIndex, resultingParameters)
+      }
+    }
+  }
+
+  /** [[TransformDataSetOperation]] applies the stochastic outlier selection 
algorithm on a
+    * [[Vector]] which will transform the high-dimensionaly input to a single 
Double output.
+    *
+    * @tparam T Type of the input and output data which has to be a subtype of 
[[Vector]]
+    * @return [[TransformDataSetOperation]] a single double which represents 
the oulierness of
+    *         the input vectors, where the output is in [0, 1]
+    */
+  implicit def transformVectors[T <: Vector : BreezeVectorConverter : 
TypeInformation : ClassTag]
+  = {
+    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
+      override def transformDataSet(instance: StochasticOutlierSelection,
+                                    transformParameters: ParameterMap,
+                                    input: DataSet[T]): DataSet[Double] = {
+
+        val resultingParameters = instance.parameters ++ transformParameters
+
+        // Map to the right format
+        val vectorsWithIndex = input.zipWithUniqueId.map(vector => {
+          BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
+        })
+
+        outlierSelection(vectorsWithIndex, resultingParameters).map(_._2)
+      }
+    }
+  }
+
+  /** Internal entry point which will execute the different stages of the 
algorithm using a single
+    * interface
+    *
+    * @param inputVectors        Input vectors on which the stochastic outlier 
selection algorithm
+    *                            will be applied which should be the index or 
a unique integer value
+    * @param transformParameters The user defined parameters of the algorithm
+    * @return The outlierness of the vectors compared to each other
+    */
+  private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector],
+                               transformParameters: ParameterMap): 
DataSet[(Int, Double)] = {
+    val dissimilarityVectors = computeDissimilarityVectors(inputVectors)
+    val affinityVectors = computeAffinity(dissimilarityVectors, 
transformParameters)
+    val bindingProbabilityVectors = 
computeBindingProbabilities(affinityVectors)
+    val outlierProbability = 
computeOutlierProbability(bindingProbabilityVectors)
+
+    outlierProbability
+  }
+
+  /** Compute pair-wise distance from each vector, to all other vectors.
+    *
+    * @param inputVectors The input vectors, will compare the vector to all 
other vectors based
+    *                     on an distance method.
+    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity 
vector
+    */
+  def computeDissimilarityVectors(inputVectors: DataSet[BreezeLabeledVector]):
+  DataSet[BreezeLabeledVector] =
+  inputVectors.cross(inputVectors) {
+    (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data))
+  }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this 
contains no information.
+    .groupBy(0)
+    .sortGroup(1, Order.ASCENDING)
+    .reduceGroup {
+      distancesIterator => {
+        val distances = distancesIterator.toList
+        val distanceVector = distances.map(_._3).toArray
+
+        BreezeLabeledVector(distances.head._1, 
BreezeDenseVector(distanceVector))
+      }
+    }
+
+  /** Approximate the affinity by fitting a Gaussian-like function
+    *
+    * @param dissimilarityVectors The dissimilarity vectors which represents 
the distance to the
+    *                             other vectors in the data set.
+    * @param resultingParameters  The user defined parameters of the algorithm
+    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity 
vector
+    */
+  def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector],
+                      resultingParameters: ParameterMap): 
DataSet[BreezeLabeledVector] = {
+    val logPerplexity = Math.log(resultingParameters(Perplexity))
+    val maxIterations = resultingParameters(MaxIterations)
+    val errorTolerance = resultingParameters(ErrorTolerance)
+
+    dissimilarityVectors.map(vec => {
+      val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, 
errorTolerance)
+      BreezeLabeledVector(vec.idx, breezeVec)
+    })
+  }
+
+  /** Normalizes the input vectors so each row sums up to one.
+    *
+    * @param affinityVectors The affinity vectors which is the quantification 
of the relationship
+    *                        between the original vectors.
+    * @return Returns new set of [[BreezeLabeledVector]] with represents the 
binding
+    *         probabilities, which is in fact the affinity where each row sums 
up to one.
+    */
+  def computeBindingProbabilities(affinityVectors: 
DataSet[BreezeLabeledVector]):
+  DataSet[BreezeLabeledVector] =
+  affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ 
sum(vec.data)))
+
+  /** Compute the final outlier probability by taking the product of the 
column.
+    *
+    * @param bindingProbabilityVectors The binding probability vectors where 
the binding
+    *                                  probability is based on the affinity 
and represents the
+    *                                  probability of a vector binding with 
another vector.
+    * @return Returns a single double which represents the final outlierness 
of the input vector.
+    */
+  def computeOutlierProbability(bindingProbabilityVectors: 
DataSet[BreezeLabeledVector]):
+  DataSet[(Int, Double)] = bindingProbabilityVectors
+    .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => {
+
+      // The DistanceMatrix removed the diagonal, but we need to compute the 
product
+      // of the column, so we need to correct the offset.
+      val columnIndex = if (pair._2 >= vec.idx) {
+        1
+      } else {
+        0
+      }
+
+      (columnIndex + pair._2, pair._1)
+    })).groupBy(0).reduceGroup {
+    probabilities => {
+      var rowNumber = -1
+      var outlierProbability = 1.0
+      for (probability <- probabilities) {
+        rowNumber = probability._1
+        outlierProbability = outlierProbability * (1.0 - probability._2)
+      }
+
+      (rowNumber, outlierProbability)
+    }
+  }
+
+  /** Performs a binary search to get affinities in such a way that each 
conditional Gaussian has
+    *  the same perplexity.
+    *
+    * @param dissimilarityVector The input dissimilarity vector which 
represents the current
+    *                            vector distance to the other vectors in the 
data set
+    * @param logPerplexity The log of the perplexity, which represents the 
probability of having
+    *                      affinity with another vector.
+    * @param maxIterations The maximum iterations to limit the computational 
time.
+    * @param tolerance The allowed tolerance to sacrifice precision for 
decreased computational
+    *                  time.
+    * @param beta: The current beta
+    * @param betaMin The lower bound of beta
+    * @param betaMax The upper bound of beta
+    * @param iteration The current iteration
+    * @return Returns the affinity vector of the input vector.
+    */
+  def binarySearch(
+      dissimilarityVector: BreezeVector[Double],
+      logPerplexity: Double,
+      maxIterations: Int,
+      tolerance: Double,
+      beta: Double = 1.0,
+      betaMin: Double = Double.NegativeInfinity,
+      betaMax: Double = Double.PositiveInfinity,
+      iteration: Int = 0)
+    : BreezeVector[Double] = {
+
+    val newAffinity = dissimilarityVector.map(d => Math.exp(-d * beta))
+    val sumA = sum(newAffinity)
+    val hCurr = Math.log(sumA) + beta * sum(dissimilarityVector :* 
newAffinity) / sumA
+    val hDiff = hCurr - logPerplexity
+
+    if (iteration < maxIterations && Math.abs(hDiff) > tolerance) {
+      // Compute the Gaussian kernel and entropy for the current precision
+      val (newBeta, newBetaMin, newBetaMax) = if (hDiff.isNaN) {
+        (beta / 10.0, betaMin, betaMax) // Reduce beta to get it in range
+      } else {
+        if (hDiff > 0) {
+          val newBeta =
+            if (betaMax == Double.PositiveInfinity || betaMax == 
Double.NegativeInfinity) {
+              beta * 2.0
+            } else {
+              (beta + betaMax) / 2.0
+            }
+
+          (newBeta, beta, betaMax)
+        } else {
+          val newBeta =
+            if (betaMin == Double.PositiveInfinity || betaMin == 
Double.NegativeInfinity) {
+              beta / 2.0
+            } else {
+              (beta + betaMin) / 2.0
+            }
+
+          (newBeta, betaMin, beta)
+        }
+      }
+
+      binarySearch(dissimilarityVector,
+        logPerplexity,
+        maxIterations,
+        tolerance,
+        newBeta,
+        newBetaMin,
+        newBetaMax,
+        iteration + 1)
+    }
+    else {
+      newAffinity
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c7415192/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala
new file mode 100644
index 0000000..dc43278
--- /dev/null
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.outlier
+
+import breeze.linalg.{sum, DenseVector => BreezeDenseVector}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import 
org.apache.flink.ml.outlier.StochasticOutlierSelection.BreezeLabeledVector
+import org.apache.flink.ml.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+class StochasticOutlierSelectionITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
+  behavior of "Stochastic Outlier Selection algorithm"
+  val EPSILON = 1e-16
+
+  /*
+    Unit-tests created based on the Python scripts of the algorithms author'
+    https://github.com/jeroenjanssens/scikit-sos
+
+    For more information about SOS, see https://github.com/jeroenjanssens/sos
+    J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
+    Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
+    Tilburg, the Netherlands, 2012.
+   */
+
+  val perplexity = 3
+  val errorTolerance = 0
+  val maxIterations = 5000
+  val parameters = new 
StochasticOutlierSelection().setPerplexity(perplexity).parameters
+
+  val env = ExecutionEnvironment.getExecutionEnvironment
+
+  it should "Compute the perplexity of the vector and return the correct 
error" in {
+    val vector = BreezeDenseVector(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 
9.0, 10.0))
+
+    val output = Array(
+      0.39682901665799636,
+      0.15747326846175236,
+      0.06248996227359784,
+      0.024797830280027126,
+      0.009840498605275054,
+      0.0039049953849556816,
+      6.149323865970302E-4,
+      2.4402301428445443E-4,
+      9.683541280042027E-5
+    )
+
+    val search = StochasticOutlierSelection.binarySearch(
+      vector,
+      Math.log(perplexity),
+      maxIterations,
+      errorTolerance
+    ).toArray
+
+    search should be(output)
+  }
+
+  it should "Compute the distance matrix and give symmetrical distances" in {
+
+    val data = env.fromCollection(List(
+      BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 3.0))),
+      BreezeLabeledVector(1, BreezeDenseVector(Array(5.0, 1.0)))
+    ))
+
+    val distanceMatrix = StochasticOutlierSelection
+      .computeDissimilarityVectors(data)
+      .map(_.data)
+      .collect()
+      .toArray
+
+    distanceMatrix(0) should be(distanceMatrix(1))
+  }
+
+  it should "Compute the distance matrix and give the correct distances" in {
+
+    val expectedDistanceMatrix = Array(
+      Array(Math.sqrt(2.0), Math.sqrt(10.0)),
+      Array(Math.sqrt(2.0), Math.sqrt(16.0)),
+      Array(Math.sqrt(16.0), Math.sqrt(10.0))
+    )
+
+    val data = env.fromCollection(Array(
+      BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 1.0))),
+      BreezeLabeledVector(1, BreezeDenseVector(Array(2.0, 2.0))),
+      BreezeLabeledVector(2, BreezeDenseVector(Array(5.0, 1.0)))
+    ))
+
+    val distanceMatrix = StochasticOutlierSelection
+      .computeDissimilarityVectors(data)
+      .map(_.data.toArray)
+      .collect()
+      .sortBy(dist => sum(dist))
+      .toArray
+
+    distanceMatrix should be(expectedDistanceMatrix)
+  }
+
+  it should "Computing the affinity matrix and return the correct affinity" in 
{
+
+    val data = env.fromCollection(List(
+      BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 1.0))),
+      BreezeLabeledVector(1, BreezeDenseVector(Array(2.0, 1.0))),
+      BreezeLabeledVector(2, BreezeDenseVector(Array(1.0, 2.0))),
+      BreezeLabeledVector(3, BreezeDenseVector(Array(2.0, 2.0))),
+      BreezeLabeledVector(4, BreezeDenseVector(Array(5.0, 8.0))) // The 
outlier!
+    ))
+
+    val distanceMatrix = 
StochasticOutlierSelection.computeDissimilarityVectors(data)
+
+
+    val affinityMatrix = 
StochasticOutlierSelection.computeAffinity(distanceMatrix, parameters)
+      .collect()
+      .map(_.data.toArray)
+      .sortBy(dist => sum(dist))
+      .toArray
+
+    val expectedAffinityMatrix = Array(
+      Array(
+        1.6502458086204375E-6, 3.4496775759599478E-6, 6.730049701933432E-6, 
1.544221669904019E-5),
+      Array(0.2837044890495805, 0.4103155587026411, 0.4103155587026411, 
0.0025393148189994897),
+      Array(0.43192525601205634, 0.30506325262816036, 0.43192525601205634, 
0.0023490595181415333),
+      Array(0.44804626736879755, 0.3212891538762665, 0.44804626736879755, 
0.0022108233460722557),
+      Array(0.46466276524577704, 0.46466276524577704, 0.3382687394674377, 
0.002071952211368232)
+    )
+
+    affinityMatrix should be(expectedAffinityMatrix)
+  }
+
+  it should "Compute the binding probabilities and return the correct 
probabilities" in {
+
+    val expectedBindingProbabilityMatrix = Array(
+      Array(0.00000000000000000, 0.3659685430819966, 0.36596854308199660,
+        0.2664300527549236, 0.0016328610810832176),
+      Array(0.06050907527090226, 0.1264893287483121, 0.24677254025174370,
+        0.5662290557290419, 0.0000000000000000000),
+      Array(0.25630819225892230, 0.3706990977807361, 0.37069909778073610,
+        0.0000000000000000, 0.0022936121796053232),
+      Array(0.36737364041784460, 0.0000000000000000, 0.26343993596023335,
+        0.3673736404178446, 0.0018127832040774768),
+      Array(0.36877315905154990, 0.2604492865700658, 0.00000000000000000,
+        0.3687731590515499, 0.0020043953268345785)
+    )
+
+    // The distance matrix
+    val data = env.fromCollection(List(
+      BreezeLabeledVector(0, new BreezeDenseVector(
+        Array(0.00000000e+00, 4.64702705e-01, 4.64702705e-01, 3.38309859e-01, 
2.07338848e-03))),
+      BreezeLabeledVector(1, new BreezeDenseVector(
+        Array(4.48047312e-01, 0.00000000e+00, 3.21290213e-01, 4.48047312e-01, 
2.21086260e-03))),
+      BreezeLabeledVector(2, new BreezeDenseVector(
+        Array(4.31883411e-01, 3.05021457e-01, 0.00000000e+00, 4.31883411e-01, 
2.34741892e-03))),
+      BreezeLabeledVector(3, new BreezeDenseVector(
+        Array(2.83688288e-01, 4.10298990e-01, 4.10298990e-01, 0.00000000e+00, 
2.53862706e-03))),
+      BreezeLabeledVector(4, new BreezeDenseVector(
+        Array(1.65000529e-06, 3.44920263e-06, 6.72917236e-06, 1.54403440e-05, 
0.00000000e+00)))
+    ))
+
+    val bindingProbabilityMatrix = 
StochasticOutlierSelection.computeBindingProbabilities(data)
+      .map(_.data.toArray)
+      .collect()
+      .sortBy(_ (0)) // Sort by the first element, because the sum is always 
equal to 1
+      .toArray
+
+    bindingProbabilityMatrix should be(expectedBindingProbabilityMatrix)
+  }
+
+
+  it should "Compute the product of the vector, should return the correct 
values" in {
+
+    val data = env.fromCollection(List(
+      BreezeLabeledVector(0, BreezeDenseVector(0.5, 0.3)),
+      BreezeLabeledVector(1, BreezeDenseVector(0.25, 0.1)),
+      BreezeLabeledVector(2, BreezeDenseVector(0.8, 0.8))
+    ))
+
+    val outlierMatrix = 
StochasticOutlierSelection.computeOutlierProbability(data)
+      .map(_._2)
+      .collect()
+      .sortBy(dist => dist)
+      .toArray
+
+    // The math by hand
+    val expectedOutlierMatrix = Array(
+      (1.0 - 0.5) * (1.0 - 0.0) * (1.0 - 0.8),
+      (1.0 - 0.0) * (1.0 - 0.25) * (1.0 - 0.8),
+      (1.0 - 0.3) * (1.0 - 0.1) * (1.0 - 0)
+    )
+
+    outlierMatrix should be(expectedOutlierMatrix)
+  }
+
+  it should "Verifying the output of the SOS algorithm assign the one true 
outlier" in {
+
+    val data = env.fromCollection(List(
+      LabeledVector(0.0, DenseVector(1.0, 1.0)),
+      LabeledVector(1.0, DenseVector(2.0, 1.0)),
+      LabeledVector(2.0, DenseVector(1.0, 2.0)),
+      LabeledVector(3.0, DenseVector(2.0, 2.0)),
+      LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
+    ))
+
+    val sos = new StochasticOutlierSelection().setPerplexity(3)
+
+    val outputVector = sos
+      .transform(data)
+      .collect()
+
+    val expectedOutputVector = Map(
+      0 -> 0.2790094479202896,
+      1 -> 0.25775014551682535,
+      2 -> 0.22136130977995766,
+      3 -> 0.12707053787018444,
+      4 -> 0.9922779902453757 // The outlier!
+    )
+
+    outputVector.foreach(output =>
+      expectedOutputVector(output._1) should be(output._2 +- EPSILON))
+  }
+}

Reply via email to