Re: KMeans Clustering is not Reproducible
Hi Ankur, thank you for answering. But my problem is not, that I'm stuck in a local extrema but rather the reproducibility of kmeans. Want I'm trying to achieve is: when the input data and all the parameters stay the same, especially the seed, I want to get the exact same results. Even though the partitioning changes. As far as I'm concerned if I'm setting a seed in a ML algorithm, I would expect that this algorithm is deterministic. Unfortunately I couldn't find any information if this a goal of Spark's mllib or not. Maybe a little bit of background. I'm trying to benchmark some ML algorithms while changing my cluster config. That is I want to find the best cluster config to achieve the same results. But what I see is that when I change the amount of executors, the results become incomparable, since the results differ. So in essence my question is, are the algorithms in the mllib partition agnostic or not? Thanks for your help, Christoph Am 24.05.2017 20:49 schrieb "Ankur Srivastava" : Hi Christoph, I am not an expert in ML and have not used Spark KMeans but your problem seems to be an issue of local minimum vs global minimum. You should run K-means multiple times with random starting point and also try with multiple values of K (unless you are already sure). Hope this helps. Thanks Ankur On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke wrote: > Hi Anastasios, > > thanks for the reply but caching doesn’t seem to change anything. > > After further investigation it really seems that the RDD#takeSample method > is the cause of the non-reproducibility. > > Is this considered a bug and should I open an Issue for that? > > BTW: my example script contains a little type in line 3: it is `feature` > not `features` (mind the `s`). > > Best, > Christoph > > The script with caching > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.feature.VectorAssembler > import org.apache.spark.storage.StorageLevel > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > // cache the data > dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("1 Partition: " + kmeans.fit(dataWith1Partition) > .computeCost(dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > // cache the data > dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("4 Partition: " + kmeans.fit(dataWith4Partition) > .computeCost(dataWith4Partition)) > > > ``` > > Output: > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > > On 22 May 2017, at 16:33, Anastasios Zouzias wrote: > > > > Hi Christoph, > > > > Take a look at this, you might end up having a similar case: > > > > http://www.spark.tc/using-sparks-cache-for-correctness-not- > just-performance/ > > > > If this is not the case, then I agree with you the kmeans should be > partitioning agnostic (although I haven't check the code yet). > > > > Best, > > Anastasios > > > > > > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke > wrote: > > Hi, > > > > I’m trying to figure out how to use KMeans in order to achieve > reproducible results. I have found that running the same kmeans instance on > the same data, with different partitioning will produce different > clusterings. > > > > Given a simple KMeans run with fixed seed returns different results on > the same > > training data, if the training data is partitioned differently. > > > > Consider the following example. The same KMeans clustering set up is run > on > > identical data. The only difference is the partitioning of the training > data > > (one partition vs. four partitions). > > > > ``` > > import org.apache.spark.sql.DataFrame > > import org.apache.spark.ml.clustering.KMeans > > import org.apache.spark.ml.features.VectorAssembler > > > > // generate random data for clustering > > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > > > val data = vecAssembler.transform(randomData) > > > > // instantiate KMeans with fixed seed > > val kmeans = new KMeans().setK(10).setSeed(9876L) > > > > // train the model with different partitioning > > val dataWith1Partition = data.repartition(1) > > println("1 Partition: " + kmeans.fit(dataWith1Partition) > .computeCost(dataWith1Partition)) > > > > val dataWith4Partition = data.repartition(4) > > println("4 Partition: " + kmeans.fit(dataWith4Partition) > .computeCost(dataWith4Partition)
Re: KMeans Clustering is not Reproducible
I agree with what Ankur said. The kmeans seeding program ('takeSample' method) runs in parallel, so each partition has its sampling points based on the local data which will cause the 'partition agnostic'. The seeding method is based on Bahmani et al. kmeansII algorithm which gives approximation guarantees on the kmeans cost. You could set the initial seeding points which will avoid the 'agnostic' issue. Regards, Yu Zhang On Wed, May 24, 2017 at 1:49 PM, Ankur Srivastava < ankur.srivast...@gmail.com> wrote: > Hi Christoph, > > I am not an expert in ML and have not used Spark KMeans but your problem > seems to be an issue of local minimum vs global minimum. You should run > K-means multiple times with random starting point and also try with > multiple values of K (unless you are already sure). > > Hope this helps. > > Thanks > Ankur > > > > On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke > wrote: > >> Hi Anastasios, >> >> thanks for the reply but caching doesn’t seem to change anything. >> >> After further investigation it really seems that the RDD#takeSample >> method is the cause of the non-reproducibility. >> >> Is this considered a bug and should I open an Issue for that? >> >> BTW: my example script contains a little type in line 3: it is `feature` >> not `features` (mind the `s`). >> >> Best, >> Christoph >> >> The script with caching >> >> ``` >> import org.apache.spark.sql.DataFrame >> import org.apache.spark.ml.clustering.KMeans >> import org.apache.spark.ml.feature.VectorAssembler >> import org.apache.spark.storage.StorageLevel >> >> // generate random data for clustering >> val randomData = spark.range(1, 1000).withColumn("a", >> rand(123)).withColumn("b", rand(321)) >> >> val vecAssembler = new VectorAssembler().setInputCols(Array("a", >> "b")).setOutputCol("features") >> >> val data = vecAssembler.transform(randomData) >> >> // instantiate KMeans with fixed seed >> val kmeans = new KMeans().setK(10).setSeed(9876L) >> >> // train the model with different partitioning >> val dataWith1Partition = data.repartition(1) >> // cache the data >> dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) >> println("1 Partition: " + kmeans.fit(dataWith1Partition) >> .computeCost(dataWith1Partition)) >> >> val dataWith4Partition = data.repartition(4) >> // cache the data >> dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) >> println("4 Partition: " + kmeans.fit(dataWith4Partition) >> .computeCost(dataWith4Partition)) >> >> >> ``` >> >> Output: >> >> ``` >> 1 Partition: 16.028212597888057 >> 4 Partition: 16.14758460544976 >> ``` >> >> > On 22 May 2017, at 16:33, Anastasios Zouzias wrote: >> > >> > Hi Christoph, >> > >> > Take a look at this, you might end up having a similar case: >> > >> > http://www.spark.tc/using-sparks-cache-for-correctness-not- >> just-performance/ >> > >> > If this is not the case, then I agree with you the kmeans should be >> partitioning agnostic (although I haven't check the code yet). >> > >> > Best, >> > Anastasios >> > >> > >> > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke >> wrote: >> > Hi, >> > >> > I’m trying to figure out how to use KMeans in order to achieve >> reproducible results. I have found that running the same kmeans instance on >> the same data, with different partitioning will produce different >> clusterings. >> > >> > Given a simple KMeans run with fixed seed returns different results on >> the same >> > training data, if the training data is partitioned differently. >> > >> > Consider the following example. The same KMeans clustering set up is >> run on >> > identical data. The only difference is the partitioning of the training >> data >> > (one partition vs. four partitions). >> > >> > ``` >> > import org.apache.spark.sql.DataFrame >> > import org.apache.spark.ml.clustering.KMeans >> > import org.apache.spark.ml.features.VectorAssembler >> > >> > // generate random data for clustering >> > val randomData = spark.range(1, 1000).withColumn("a", >> rand(123)).withColumn("b", rand(321)) >> > >> > val vecAssembler = new VectorAssembler().setInputCols(Array("a", >> "b")).setOutputCol("features") >> > >> > val data = vecAssembler.transform(randomData) >> > >> > // instantiate KMeans with fixed seed >> > val kmeans = new KMeans().setK(10).setSeed(9876L) >> > >> > // train the model with different partitioning >> > val dataWith1Partition = data.repartition(1) >> > println("1 Partition: " + kmeans.fit(dataWith1Partition) >> .computeCost(dataWith1Partition)) >> > >> > val dataWith4Partition = data.repartition(4) >> > println("4 Partition: " + kmeans.fit(dataWith4Partition) >> .computeCost(dataWith4Partition)) >> > ``` >> > >> > I get the following related cost >> > >> > ``` >> > 1 Partition: 16.028212597888057 >> > 4 Partition: 16.14758460544976 >> > ``` >> > >> > What I want to achieve is that repeated computations of the KMeans >> Clustering should yield identical result on identical training data, >> regardless of the partitioning. >> > >> > L
Re: KMeans Clustering is not Reproducible
Hi Christoph, I am not an expert in ML and have not used Spark KMeans but your problem seems to be an issue of local minimum vs global minimum. You should run K-means multiple times with random starting point and also try with multiple values of K (unless you are already sure). Hope this helps. Thanks Ankur On Wed, May 24, 2017 at 2:15 AM, Christoph Bruecke wrote: > Hi Anastasios, > > thanks for the reply but caching doesn’t seem to change anything. > > After further investigation it really seems that the RDD#takeSample method > is the cause of the non-reproducibility. > > Is this considered a bug and should I open an Issue for that? > > BTW: my example script contains a little type in line 3: it is `feature` > not `features` (mind the `s`). > > Best, > Christoph > > The script with caching > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.feature.VectorAssembler > import org.apache.spark.storage.StorageLevel > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > // cache the data > dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > // cache the data > dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > > > ``` > > Output: > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > > On 22 May 2017, at 16:33, Anastasios Zouzias wrote: > > > > Hi Christoph, > > > > Take a look at this, you might end up having a similar case: > > > > http://www.spark.tc/using-sparks-cache-for-correctness- > not-just-performance/ > > > > If this is not the case, then I agree with you the kmeans should be > partitioning agnostic (although I haven't check the code yet). > > > > Best, > > Anastasios > > > > > > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke > wrote: > > Hi, > > > > I’m trying to figure out how to use KMeans in order to achieve > reproducible results. I have found that running the same kmeans instance on > the same data, with different partitioning will produce different > clusterings. > > > > Given a simple KMeans run with fixed seed returns different results on > the same > > training data, if the training data is partitioned differently. > > > > Consider the following example. The same KMeans clustering set up is run > on > > identical data. The only difference is the partitioning of the training > data > > (one partition vs. four partitions). > > > > ``` > > import org.apache.spark.sql.DataFrame > > import org.apache.spark.ml.clustering.KMeans > > import org.apache.spark.ml.features.VectorAssembler > > > > // generate random data for clustering > > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > > > val data = vecAssembler.transform(randomData) > > > > // instantiate KMeans with fixed seed > > val kmeans = new KMeans().setK(10).setSeed(9876L) > > > > // train the model with different partitioning > > val dataWith1Partition = data.repartition(1) > > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > > > val dataWith4Partition = data.repartition(4) > > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > > ``` > > > > I get the following related cost > > > > ``` > > 1 Partition: 16.028212597888057 > > 4 Partition: 16.14758460544976 > > ``` > > > > What I want to achieve is that repeated computations of the KMeans > Clustering should yield identical result on identical training data, > regardless of the partitioning. > > > > Looking through the Spark source code, I guess the cause is the > initialization method of KMeans which in turn uses the `takeSample` method, > which does not seem to be partition agnostic. > > > > Is this behaviour expected? Is there anything I could do to achieve > reproducible results? > > > > Best, > > Christoph > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > > -- > > -- Anastasios Zouzias > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: KMeans Clustering is not Reproducible
Hi Anastasios, thanks for the reply but caching doesn’t seem to change anything. After further investigation it really seems that the RDD#takeSample method is the cause of the non-reproducibility. Is this considered a bug and should I open an Issue for that? BTW: my example script contains a little type in line 3: it is `feature` not `features` (mind the `s`). Best, Christoph The script with caching ``` import org.apache.spark.sql.DataFrame import org.apache.spark.ml.clustering.KMeans import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.storage.StorageLevel // generate random data for clustering val randomData = spark.range(1, 1000).withColumn("a", rand(123)).withColumn("b", rand(321)) val vecAssembler = new VectorAssembler().setInputCols(Array("a", "b")).setOutputCol("features") val data = vecAssembler.transform(randomData) // instantiate KMeans with fixed seed val kmeans = new KMeans().setK(10).setSeed(9876L) // train the model with different partitioning val dataWith1Partition = data.repartition(1) // cache the data dataWith1Partition.persist(StorageLevel.MEMORY_AND_DISK) println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) val dataWith4Partition = data.repartition(4) // cache the data dataWith4Partition.persist(StorageLevel.MEMORY_AND_DISK) println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) ``` Output: ``` 1 Partition: 16.028212597888057 4 Partition: 16.14758460544976 ``` > On 22 May 2017, at 16:33, Anastasios Zouzias wrote: > > Hi Christoph, > > Take a look at this, you might end up having a similar case: > > http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/ > > If this is not the case, then I agree with you the kmeans should be > partitioning agnostic (although I haven't check the code yet). > > Best, > Anastasios > > > On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke > wrote: > Hi, > > I’m trying to figure out how to use KMeans in order to achieve reproducible > results. I have found that running the same kmeans instance on the same data, > with different partitioning will produce different clusterings. > > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > > Consider the following example. The same KMeans clustering set up is run on > identical data. The only difference is the partitioning of the training data > (one partition vs. four partitions). > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.features.VectorAssembler > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > println("1 Partition: " + > kmeans.fit(dataWith1Partition).computeCost(dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > println("4 Partition: " + > kmeans.fit(dataWith4Partition).computeCost(dataWith4Partition)) > ``` > > I get the following related cost > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > What I want to achieve is that repeated computations of the KMeans Clustering > should yield identical result on identical training data, regardless of the > partitioning. > > Looking through the Spark source code, I guess the cause is the > initialization method of KMeans which in turn uses the `takeSample` method, > which does not seem to be partition agnostic. > > Is this behaviour expected? Is there anything I could do to achieve > reproducible results? > > Best, > Christoph > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > -- > -- Anastasios Zouzias - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: KMeans Clustering is not Reproducible
Hi Christoph, Take a look at this, you might end up having a similar case: http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/ If this is not the case, then I agree with you the kmeans should be partitioning agnostic (although I haven't check the code yet). Best, Anastasios On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke wrote: > Hi, > > I’m trying to figure out how to use KMeans in order to achieve > reproducible results. I have found that running the same kmeans instance on > the same data, with different partitioning will produce different > clusterings. > > Given a simple KMeans run with fixed seed returns different results on the > same > training data, if the training data is partitioned differently. > > Consider the following example. The same KMeans clustering set up is run on > identical data. The only difference is the partitioning of the training > data > (one partition vs. four partitions). > > ``` > import org.apache.spark.sql.DataFrame > import org.apache.spark.ml.clustering.KMeans > import org.apache.spark.ml.features.VectorAssembler > > // generate random data for clustering > val randomData = spark.range(1, 1000).withColumn("a", > rand(123)).withColumn("b", rand(321)) > > val vecAssembler = new VectorAssembler().setInputCols(Array("a", > "b")).setOutputCol("features") > > val data = vecAssembler.transform(randomData) > > // instantiate KMeans with fixed seed > val kmeans = new KMeans().setK(10).setSeed(9876L) > > // train the model with different partitioning > val dataWith1Partition = data.repartition(1) > println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost( > dataWith1Partition)) > > val dataWith4Partition = data.repartition(4) > println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost( > dataWith4Partition)) > ``` > > I get the following related cost > > ``` > 1 Partition: 16.028212597888057 > 4 Partition: 16.14758460544976 > ``` > > What I want to achieve is that repeated computations of the KMeans > Clustering should yield identical result on identical training data, > regardless of the partitioning. > > Looking through the Spark source code, I guess the cause is the > initialization method of KMeans which in turn uses the `takeSample` method, > which does not seem to be partition agnostic. > > Is this behaviour expected? Is there anything I could do to achieve > reproducible results? > > Best, > Christoph > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- -- Anastasios Zouzias