Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/2942#discussion_r19490284
--- Diff: docs/mllib-clustering.md ---
@@ -153,3 +153,75 @@ provided in the [Self-Contained
Applications](quick-start.html#self-contained-ap
section of the Spark
Quick Start guide. Be sure to also include *spark-mllib* to your build
file as
a dependency.
+
+## Streaming clustering
+
+When data arrive in a stream, we may want to estimate clusters
dynamically, updating them as new data arrive. MLlib provides support for
streaming KMeans clustering, with parameters to control the decay (or
"forgetfulness") of the estimates. The algorithm uses a generalization of the
mini-batch KMeans update rule. For each batch of data, we assign all points to
their nearest cluster, compute new cluster centers, then update each cluster
using:
+
+`\begin{equation}
+ c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t}
+\end{equation}`
+`\begin{equation}
+ n_{t+1} = n_t + m_t
+\end{equation}`
+
+Where `$c_t$` is the previous center for the cluster, `$n_t$` is the
number of points assigned to the cluster thus far, `$x_t$` is the new cluster
center from the current batch, and `$m_t$` is the number of points added to the
cluster in the current batch. The decay factor `$\alpha$` can be used to ignore
the past: with `$\alpha$=1` all data will be used from the beginning; with
`$\alpha$=0` only the most recent data will be used. This is analogous to an
expontentially-weighted moving average.
+
+### Examples
+
+This example shows how to estimate clusters on streaming data.
+
+<div class="codetabs">
+
+<div data-lang="scala" markdown="1">
+
+First we import the neccessary classes.
+
+{% highlight scala %}
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.clustering.StreamingKMeans
+
+{% endhighlight %}
+
+Then we make an input stream of vectors for training, as well as one for
testing. We assume a StreamingContext `ssc` has been created, see [Spark
Streaming Programming Guide](streaming-programming-guide.html#initializing) for
more info. For this example, we use vector data.
+
+{% highlight scala %}
+
+val trainingData =
ssc.textFileStream("/training/data/dir").map(Vectors.parse)
+val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
+
+{% endhighlight %}
+
+We create a model with random clusters and specify the number of clusters
to find
+
+{% highlight scala %}
+
+val numDimensions = 3
+val numClusters = 2
+val model = new StreamingKMeans()
+ .setK(numClusters)
+ .setDecayFactor(1.0)
+ .setRandomWeights(numDimensions)
+
+{% endhighlight %}
+
+Now register the streams for training and testing and start the job,
printing the predicted cluster assignments on new data points as they arrive.
+
+{% highlight scala %}
+
+model.trainOn(trainingData)
+model.predictOn(testData).print()
--- End diff --
`predictOn` only outputs the prediction, which is not very useful. maybe we
should use `predictOnValues` here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]