Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Christoph Brücke
Hi Ankur,

thank you for answering. But my problem is not, that I'm stuck in a local
extrema but rather the reproducibility of kmeans. Want I'm trying to
achieve is: when the input data and all the parameters stay the same,
especially the seed, I want to get the exact same results. Even though the
partitioning changes. As far as I'm concerned if I'm setting a seed in a ML
algorithm, I would expect that this algorithm is deterministic.

Unfortunately I couldn't find any information if this a goal of Spark's
mllib or not.

Maybe a little bit of background. I'm trying to benchmark some ML
algorithms while changing my cluster config. That is I want to find the
best cluster config to achieve the same results. But what I see is that
when I change the amount of executors, the results become incomparable,
since the results differ.

So in essence my question is, are the algorithms in the mllib partition
agnostic or not?

Thanks for your help,
Christoph

Am 24.05.2017 20:49 schrieb "Ankur Srivastava" :

Hi Christoph,

I am not an expert in ML and have not used Spark KMeans but your problem
seems to be an issue of local minimum vs global minimum. You should run
K-means multiple times with random starting point and also try with
multiple values of K (unless you are already sure).

Hope this helps.

Thanks
Ankur



On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
wrote:

> Hi Anastasios,
>
> thanks for the reply but caching doesn’t seem to change anything.
>
> After further investigation it really seems that the RDD#takeSample method
> is the cause of the non-reproducibility.
>
> Is this considered a bug and should I open an Issue for that?
>
> BTW: my example script contains a little type in line 3: it is `feature`
> not `features` (mind the `s`).
>
> Best,
> Christoph
>
> The script with caching
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.VectorAssembler
> import org.apache.spark.storage.StorageLevel
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> // cache the data
> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("1 Partition: " + kmeans.fit(dataWith1Partition)
> .computeCost(dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> // cache the data
> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("4 Partition: " + kmeans.fit(dataWith4Partition)
> .computeCost(dataWith4Partition))
>
>
> ```
>
> Output:
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> >
> > Hi Christoph,
> >
> > Take a look at this, you might end up having a similar case:
> >
> > http://www.spark.tc/using-sparks-cache-for-correctness-not-
> just-performance/
> >
> > If this is not the case, then I agree with you the kmeans should be
> partitioning agnostic (although I haven't check the code yet).
> >
> > Best,
> > Anastasios
> >
> >
> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
> wrote:
> > Hi,
> >
> > I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
> >
> > Given a simple KMeans run with fixed seed returns different results on
> the same
> > training data, if the training data is partitioned differently.
> >
> > Consider the following example. The same KMeans clustering set up is run
> on
> > identical data. The only difference is the partitioning of the training
> data
> > (one partition vs. four partitions).
> >
> > ```
> > import org.apache.spark.sql.DataFrame
> > import org.apache.spark.ml.clustering.KMeans
> > import org.apache.spark.ml.features.VectorAssembler
> >
> > // generate random data for clustering
> > val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
> >
> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
> >
> > val data = vecAssembler.transform(randomData)
> >
> > // instantiate KMeans with fixed seed
> > val kmeans = new KMeans().setK(10).setSeed(9876L)
> >
> > // train the model with different partitioning
> > val dataWith1Partition = data.repartition(1)
> > println("1 Partition: " + kmeans.fit(dataWith1Partition)
> .computeCost(dataWith1Partition))
> >
> > val dataWith4Partition = data.repartition(4)
> > println("4 Partition: " + kmeans.fit(dataWith4Partition)
> .computeCost(dataWith4Partition)

Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Yu Zhang
I agree with what Ankur said. The kmeans seeding program ('takeSample'
method) runs in parallel, so each partition has its sampling points based
on the local data which will cause the 'partition agnostic'. The seeding
method is based on Bahmani et al. kmeansII algorithm which gives
approximation guarantees on the kmeans cost.

You could set the initial seeding points which will avoid the 'agnostic'
issue.



Regards,
Yu Zhang

On Wed, May 24, 2017 at 1:49 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Christoph,
>
> I am not an expert in ML and have not used Spark KMeans but your problem
> seems to be an issue of local minimum vs global minimum. You should run
> K-means multiple times with random starting point and also try with
> multiple values of K (unless you are already sure).
>
> Hope this helps.
>
> Thanks
> Ankur
>
>
>
> On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
> wrote:
>
>> Hi Anastasios,
>>
>> thanks for the reply but caching doesn’t seem to change anything.
>>
>> After further investigation it really seems that the RDD#takeSample
>> method is the cause of the non-reproducibility.
>>
>> Is this considered a bug and should I open an Issue for that?
>>
>> BTW: my example script contains a little type in line 3: it is `feature`
>> not `features` (mind the `s`).
>>
>> Best,
>> Christoph
>>
>> The script with caching
>>
>> ```
>> import org.apache.spark.sql.DataFrame
>> import org.apache.spark.ml.clustering.KMeans
>> import org.apache.spark.ml.feature.VectorAssembler
>> import org.apache.spark.storage.StorageLevel
>>
>> // generate random data for clustering
>> val randomData = spark.range(1, 1000).withColumn("a",
>> rand(123)).withColumn("b", rand(321))
>>
>> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
>> "b")).setOutputCol("features")
>>
>> val data = vecAssembler.transform(randomData)
>>
>> // instantiate KMeans with fixed seed
>> val kmeans = new KMeans().setK(10).setSeed(9876L)
>>
>> // train the model with different partitioning
>> val dataWith1Partition = data.repartition(1)
>> // cache the data
>> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
>> println("1 Partition: " + kmeans.fit(dataWith1Partition)
>> .computeCost(dataWith1Partition))
>>
>> val dataWith4Partition = data.repartition(4)
>> // cache the data
>> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
>> println("4 Partition: " + kmeans.fit(dataWith4Partition)
>> .computeCost(dataWith4Partition))
>>
>>
>> ```
>>
>> Output:
>>
>> ```
>> 1 Partition: 16.028212597888057
>> 4 Partition: 16.14758460544976
>> ```
>>
>> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
>> >
>> > Hi Christoph,
>> >
>> > Take a look at this, you might end up having a similar case:
>> >
>> > http://www.spark.tc/using-sparks-cache-for-correctness-not-
>> just-performance/
>> >
>> > If this is not the case, then I agree with you the kmeans should be
>> partitioning agnostic (although I haven't check the code yet).
>> >
>> > Best,
>> > Anastasios
>> >
>> >
>> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
>> wrote:
>> > Hi,
>> >
>> > I’m trying to figure out how to use KMeans in order to achieve
>> reproducible results. I have found that running the same kmeans instance on
>> the same data, with different partitioning will produce different
>> clusterings.
>> >
>> > Given a simple KMeans run with fixed seed returns different results on
>> the same
>> > training data, if the training data is partitioned differently.
>> >
>> > Consider the following example. The same KMeans clustering set up is
>> run on
>> > identical data. The only difference is the partitioning of the training
>> data
>> > (one partition vs. four partitions).
>> >
>> > ```
>> > import org.apache.spark.sql.DataFrame
>> > import org.apache.spark.ml.clustering.KMeans
>> > import org.apache.spark.ml.features.VectorAssembler
>> >
>> > // generate random data for clustering
>> > val randomData = spark.range(1, 1000).withColumn("a",
>> rand(123)).withColumn("b", rand(321))
>> >
>> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
>> "b")).setOutputCol("features")
>> >
>> > val data = vecAssembler.transform(randomData)
>> >
>> > // instantiate KMeans with fixed seed
>> > val kmeans = new KMeans().setK(10).setSeed(9876L)
>> >
>> > // train the model with different partitioning
>> > val dataWith1Partition = data.repartition(1)
>> > println("1 Partition: " + kmeans.fit(dataWith1Partition)
>> .computeCost(dataWith1Partition))
>> >
>> > val dataWith4Partition = data.repartition(4)
>> > println("4 Partition: " + kmeans.fit(dataWith4Partition)
>> .computeCost(dataWith4Partition))
>> > ```
>> >
>> > I get the following related cost
>> >
>> > ```
>> > 1 Partition: 16.028212597888057
>> > 4 Partition: 16.14758460544976
>> > ```
>> >
>> > What I want to achieve is that repeated computations of the KMeans
>> Clustering should yield identical result on identical training data,
>> regardless of the partitioning.
>> >
>> > L

Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Ankur Srivastava
Hi Christoph,

I am not an expert in ML and have not used Spark KMeans but your problem
seems to be an issue of local minimum vs global minimum. You should run
K-means multiple times with random starting point and also try with
multiple values of K (unless you are already sure).

Hope this helps.

Thanks
Ankur



On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke 
wrote:

> Hi Anastasios,
>
> thanks for the reply but caching doesn’t seem to change anything.
>
> After further investigation it really seems that the RDD#takeSample method
> is the cause of the non-reproducibility.
>
> Is this considered a bug and should I open an Issue for that?
>
> BTW: my example script contains a little type in line 3: it is `feature`
> not `features` (mind the `s`).
>
> Best,
> Christoph
>
> The script with caching
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.VectorAssembler
> import org.apache.spark.storage.StorageLevel
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> // cache the data
> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> // cache the data
> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
> println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
>
>
> ```
>
> Output:
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> > On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> >
> > Hi Christoph,
> >
> > Take a look at this, you might end up having a similar case:
> >
> > http://www.spark.tc/using-sparks-cache-for-correctness-
> not-just-performance/
> >
> > If this is not the case, then I agree with you the kmeans should be
> partitioning agnostic (although I haven't check the code yet).
> >
> > Best,
> > Anastasios
> >
> >
> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
> wrote:
> > Hi,
> >
> > I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
> >
> > Given a simple KMeans run with fixed seed returns different results on
> the same
> > training data, if the training data is partitioned differently.
> >
> > Consider the following example. The same KMeans clustering set up is run
> on
> > identical data. The only difference is the partitioning of the training
> data
> > (one partition vs. four partitions).
> >
> > ```
> > import org.apache.spark.sql.DataFrame
> > import org.apache.spark.ml.clustering.KMeans
> > import org.apache.spark.ml.features.VectorAssembler
> >
> > // generate random data for clustering
> > val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
> >
> > val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
> >
> > val data = vecAssembler.transform(randomData)
> >
> > // instantiate KMeans with fixed seed
> > val kmeans = new KMeans().setK(10).setSeed(9876L)
> >
> > // train the model with different partitioning
> > val dataWith1Partition = data.repartition(1)
> > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
> >
> > val dataWith4Partition = data.repartition(4)
> > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
> > ```
> >
> > I get the following related cost
> >
> > ```
> > 1 Partition: 16.028212597888057
> > 4 Partition: 16.14758460544976
> > ```
> >
> > What I want to achieve is that repeated computations of the KMeans
> Clustering should yield identical result on identical training data,
> regardless of the partitioning.
> >
> > Looking through the Spark source code, I guess the cause is the
> initialization method of KMeans which in turn uses the `takeSample` method,
> which does not seem to be partition agnostic.
> >
> > Is this behaviour expected? Is there anything I could do to achieve
> reproducible results?
> >
> > Best,
> > Christoph
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> >
> >
> > --
> > -- Anastasios Zouzias
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: KMeans Clustering is not Reproducible

2017-05-24 Thread Christoph Bruecke
Hi Anastasios,

thanks for the reply but caching doesn’t seem to change anything.

After further investigation it really seems that the RDD#takeSample method is 
the cause of the non-reproducibility.

Is this considered a bug and should I open an Issue for that?

BTW: my example script contains a little type in line 3: it is `feature` not 
`features` (mind the `s`).

Best,
Christoph

The script with caching

```
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.storage.StorageLevel

// generate random data for clustering
val randomData = spark.range(1, 1000).withColumn("a", 
rand(123)).withColumn("b", rand(321))

val vecAssembler = new VectorAssembler().setInputCols(Array("a", 
"b")).setOutputCol("features")

val data = vecAssembler.transform(randomData)

// instantiate KMeans with fixed seed
val kmeans = new KMeans().setK(10).setSeed(9876L)

// train the model with different partitioning
val dataWith1Partition = data.repartition(1)
// cache the data
dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK)
println("1 Partition: " + 
kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition))

val dataWith4Partition = data.repartition(4)
// cache the data
dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK)
println("4 Partition: " + 
kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))


```

Output:

```
1 Partition: 16.028212597888057
4 Partition: 16.14758460544976
```
 
> On 22 May 2017, at 16:33, Anastasios Zouzias  wrote:
> 
> Hi Christoph,
> 
> Take a look at this, you might end up having a similar case:
> 
> http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/
> 
> If this is not the case, then I agree with you the kmeans should be 
> partitioning agnostic (although I haven't check the code yet).
> 
> Best,
> Anastasios
> 
> 
> On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke  
> wrote:
> Hi,
> 
> I’m trying to figure out how to use KMeans in order to achieve reproducible 
> results. I have found that running the same kmeans instance on the same data, 
> with different partitioning will produce different clusterings.
> 
> Given a simple KMeans run with fixed seed returns different results on the 
> same
> training data, if the training data is partitioned differently.
> 
> Consider the following example. The same KMeans clustering set up is run on
> identical data. The only difference is the partitioning of the training data
> (one partition vs. four partitions).
> 
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.features.VectorAssembler
> 
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a", 
> rand(123)).withColumn("b", rand(321))
> 
> val vecAssembler = new VectorAssembler().setInputCols(Array("a", 
> "b")).setOutputCol("features")
> 
> val data = vecAssembler.transform(randomData)
> 
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
> 
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> println("1 Partition: " + 
> kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition))
> 
> val dataWith4Partition = data.repartition(4)
> println("4 Partition: " + 
> kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition))
> ```
> 
> I get the following related cost
> 
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
> 
> What I want to achieve is that repeated computations of the KMeans Clustering 
> should yield identical result on identical training data, regardless of the 
> partitioning.
> 
> Looking through the Spark source code, I guess the cause is the 
> initialization method of KMeans which in turn uses the `takeSample` method, 
> which does not seem to be partition agnostic.
> 
> Is this behaviour expected? Is there anything I could do to achieve 
> reproducible results?
> 
> Best,
> Christoph
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> 
> -- 
> -- Anastasios Zouzias


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: KMeans Clustering is not Reproducible

2017-05-22 Thread Anastasios Zouzias
Hi Christoph,

Take a look at this, you might end up having a similar case:

http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/

If this is not the case, then I agree with you the kmeans should be
partitioning agnostic (although I haven't check the code yet).

Best,
Anastasios


On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke 
wrote:

> Hi,
>
> I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
>
> Given a simple KMeans run with fixed seed returns different results on the
> same
> training data, if the training data is partitioned differently.
>
> Consider the following example. The same KMeans clustering set up is run on
> identical data. The only difference is the partitioning of the training
> data
> (one partition vs. four partitions).
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.features.VectorAssembler
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
> ```
>
> I get the following related cost
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> What I want to achieve is that repeated computations of the KMeans
> Clustering should yield identical result on identical training data,
> regardless of the partitioning.
>
> Looking through the Spark source code, I guess the cause is the
> initialization method of KMeans which in turn uses the `takeSample` method,
> which does not seem to be partition agnostic.
>
> Is this behaviour expected? Is there anything I could do to achieve
> reproducible results?
>
> Best,
> Christoph
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias