This simple example works for me, it prints out the updated model centers.
I'm running from the master branch.

          val sc = new SparkContext("local[2]", "test")
    val ssc = new StreamingContext(sc, Seconds(1))

    val kMeans = new StreamingKMeans()
        .setK(2)
        .setDecayFactor(0.0)
        .setInitialCenters(Array(Vectors.dense(0.0), Vectors.dense(1.0)),
Array(1.0, 1.0))

    val rddQueue = new SynchronizedQueue[RDD[Vector]]()

    val data1 = sc.parallelize(Array(
      Vectors.dense(-0.5),
      Vectors.dense(0.6),
      Vectors.dense(0.8)
    ))

    val data2 = sc.parallelize(Array(
      Vectors.dense(0.2),
      Vectors.dense(-0.1),
      Vectors.dense(0.3)
    ))

    rddQueue += data1
    rddQueue += data2

    val inputStream = ssc.queueStream(rddQueue)

    kMeans.trainOn(inputStream)

    val predictStream = kMeans.predictOn(inputStream)

    def collect(rdd: RDD[Int]): Unit = {
      val rdd_collect = rdd.collect()
      println(s"predict_results: ${rdd_collect.mkString(",")}")
      kMeans.latestModel.clusterCenters.foreach(println)
    }

    predictStream.foreachRDD(collect _)

    ssc.start()
    ssc.awaitTermination()

On Fri, Feb 19, 2016 at 1:15 PM, krishna ramachandran <[email protected]>
wrote:

> Also the cluster centroid I get in streaming mode (some with negative
> values) do not make sense - if I use the same data and run in batch
>
> KMeans.train(sc.parallelize(parsedData), numClusters, numIterations)
>
> cluster centers are what you would expect.
>
> Krishna
>
>
>
> On Fri, Feb 19, 2016 at 12:49 PM, krishna ramachandran <[email protected]>
> wrote:
>
>> ok i will share a simple example soon. meantime you will be able to see
>> this behavior using example here,
>>
>>
>> https://github.com/apache/spark/blob/branch-1.2/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala
>>
>> slightly modify it to include
>>
>> model.latestModel.clusterCenters.foreach(println)
>>
>> (after model.trainOn)
>>
>> add new files to trainingDir periodically
>>
>> I have 3 dimensions per data-point - they look like these,
>>
>> [1, 1, 385.2241452777778]
>>
>> [3, 1, 384.7529463888889]
>>
>> [4,1, 3083.2778025]
>>
>> [2, 4, 6226.402321388889]
>>
>> [1, 2, 785.8426655555555]
>>
>> [5, 1, 6706.054241388889]
>>
>> ........
>>
>> and monitor. please let know if I missed something
>>
>> Krishna
>>
>>
>>
>>
>>
>> On Fri, Feb 19, 2016 at 10:59 AM, Bryan Cutler <[email protected]> wrote:
>>
>>> Can you share more of your code to reproduce this issue?  The model
>>> should be updated with each batch, but can't tell what is happening from
>>> what you posted so far.
>>>
>>> On Fri, Feb 19, 2016 at 10:40 AM, krishna ramachandran <[email protected]
>>> > wrote:
>>>
>>>> Hi Bryan
>>>> Agreed. It is a single statement to print the centers once for *every*
>>>> streaming batch (4 secs) - remember this is in streaming mode and the
>>>> receiver has fresh data every batch. That is, as the model is trained
>>>> continuously so I expect the centroids to change with incoming streams (at
>>>> least until convergence)
>>>>
>>>> But am seeing same centers always for the entire duration - ran the app
>>>> for several hours with a custom receiver.
>>>>
>>>> Yes I am using the latestModel to predict using "labeled" test data.
>>>> But also like to know where my centers are
>>>>
>>>> regards
>>>> Krishna
>>>>
>>>>
>>>>
>>>> On Fri, Feb 19, 2016 at 10:18 AM, Bryan Cutler <[email protected]>
>>>> wrote:
>>>>
>>>>> Could you elaborate where the issue is?  You say calling
>>>>> model.latestModel.clusterCenters.foreach(println) doesn't show an updated
>>>>> model, but that is just a single statement to print the centers once..
>>>>>
>>>>> Also, is there any reason you don't predict on the test data like this?
>>>>>
>>>>> model.predictOnValues(testData.map(lp => (lp.label,
>>>>> lp.features))).print()
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:59 PM, ramach1776 <[email protected]> wrote:
>>>>>
>>>>>> I have streaming application wherein I train the model using a
>>>>>> receiver input
>>>>>> stream in 4 sec batches
>>>>>>
>>>>>> val stream = ssc.receiverStream(receiver) //receiver gets new data
>>>>>> every
>>>>>> batch
>>>>>> model.trainOn(stream.map(Vectors.parse))
>>>>>> If I use
>>>>>> model.latestModel.clusterCenters.foreach(println)
>>>>>>
>>>>>> the value of cluster centers remain unchanged from the very initial
>>>>>> value it
>>>>>> got during first iteration (when the streaming app started)
>>>>>>
>>>>>> when I use the model to predict cluster assignment with a labeled
>>>>>> input the
>>>>>> assignments change over time as expected
>>>>>>
>>>>>>       testData.transform {rdd =>
>>>>>>         rdd.map(lp => (lp.label,
>>>>>> model.latestModel().predict(lp.features)))
>>>>>>       }.print()
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingKMeans-does-not-update-cluster-centroid-locations-tp26275.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: [email protected]
>>>>>> For additional commands, e-mail: [email protected]
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to