Re: [MLlib] kmeans random initialization, same seed every time
I'm sorry, I missed some important informations. I use Spark version 2.0.2 in Scala 2.11.8. 2017-03-14 13:44 GMT+01:00 Julian Keppel <juliankeppel1...@gmail.com>: > Hi everybody, > > I make some experiments with the Spark kmeans implementation of the new > DataFrame-API. I compare clustering results of different runs with > different parameters. I recognized that for random initialization mode, the > seed value is the same every time. How is it calculated? In my > understanding the seed should be random if it is not provided by the user. > > Thank you for you help. > > Julian >
[MLlib] kmeans random initialization, same seed every time
Hi everybody, I make some experiments with the Spark kmeans implementation of the new DataFrame-API. I compare clustering results of different runs with different parameters. I recognized that for random initialization mode, the seed value is the same every time. How is it calculated? In my understanding the seed should be random if it is not provided by the user. Thank you for you help. Julian
[Spark DataFrames/Streaming]: Bad performance with window function in streaming job
Hi, I use Spark 2.0.2 and want to do the following: I extract features in a streaming job and than apply the records to a k-means model. Some of the features are simple ones which are calculated directly from the record. But I also have more complex features which depend on records from a specified time window before. They count how many connections in the last second were to the same host or service as the current one. I decided to use the SQL window functions for this. So I build window specifications: val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy( desc("timestamp")).rangeBetween(-1L, 0L) val serviceCountWindow = Window.partitionBy("service"). orderBy(desc("timestamp")).rangeBetween(-1L, 0L) And a function which is called to extract this features on every batch: def extractTrafficFeatures(dataset: Dataset[Row]) = { dataset .withColumn("host_count", count(dataset("plainrecord.ip_ dst")).over(hostCountWindow)) .withColumn("srv_count", count(dataset("service")). over(serviceCountWindow)) } And use this function as follows stream.map(...).map(...).foreachRDD { rdd => val dataframe = rdd.toDF(featureHeaders: _*).transform( extractTrafficFeatures(_)) ... } The problem is that this has a very bad performance. A batch needs between 1 and 3 seconds for a average input rate of less than 100 records per second. I guess it comes from the partitioning, which produces a lot of shuffling? Is there a better way to calculate these features on the streaming data? Or am I doing something wrong here? Thank you for your help.
Re: Kafka direct approach,App UI shows wrong input rate
Oh, sorry. I made a mistake... It's spark version 2.0.1, not 2.0.2. When I wrote the initial message I built my app with 2.0.2 and deployed it on a cluster with 2.0.1. So I thought this could be the problem. But now I changed it and build my app with 2.0.1 but the problem still remains. 2016-11-19 18:06 GMT+01:00 Cody Koeninger <c...@koeninger.org>: > There have definitely been issues with UI reporting for the direct > stream in the past, but I'm not able to reproduce this with 2.0.2 and > 0.8. See below: > > https://i.imgsafe.org/086019ae57.png > > > > On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel > <juliankeppel1...@gmail.com> wrote: > > Hello, > > > > I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is > 0.10.0.1 > > (Scala 2.11). I read data from Kafka with the direct approach. The > complete > > infrastructure runs on Google Container Engine. > > > > I wonder why the corresponding application UI says the input rate is zero > > records per second. This is definitely wrong. I checked it while I > printed > > out the incoming records to the driver console. All other metrics seem > to be > > correct (at least they are realistic). > > > > What is going on here? Do you have any idea? Thanks for you help. > > > > Julian >
Re: using StreamingKMeans
I do research in anomaly detection with methods of machine learning at the moment. And currently I do kmeans clustering, too in an offline learning setting. In further work we want to compare the two paradigms of offline and online learning. I would like to share some thoughts on this disscussion. My offline setting is exactly what Guha Ayan explained: We collect data for training and test over few days/weeks/month and train the model periodically, lets say once a week for example. Please note that kmeans is unsupervised, so it doesn't have any idea of what you data is about and what could be "normal" or an anomaly. So in my opinion the training dataset has to represent a state in which everything occors, the normal datapoints and also as any anomalies. Referring to "Anomaly Detection: A Survey" from Varun Chandola et. al. from 2009 there are different methods of interpreting the results than. As an example: "Normal data instances belong to large and dense clusters, while anomalies either belong to small or sparse clusters". So potential anomalies have to be present in you trainingdataset, I think. The online learning setting is meant to adapt rapid changes in you environment. So for example, if you are analyzing network traffinc, and you add a new service which produces a lot of traffic (a lot of users use the new service), than in an offline setting where you learn just once a week, your new service may produce a false alarm whereas the online model would adapt these changes (depending on the configured forgetfullness). There are use cases where you have a very dynamic environment (for example flight ticket prices), where you need to adapt you model rapidly (see for example here: https://youtu.be/wyfTjd9z1sY). 2016-11-20 2:11 GMT+01:00 Debasish Ghosh: > I share both the concerns that u have expressed. And as I mentioned in my > earlier mail, offline (batch) training is an option if I get a dataset > without outliers. In that case I can train and have a model. I find the > model parameters, which will be the mean distance to the centroid. Note in > training I will have only 1 cluster as it's only normal data (no outlier). > > I can now pass these parameters to the prediction phase which can work on > streaming data. In the prediction phase I just compute the distance to > centroid for each point and flag the violating ones as outliers. > > This looks like a perfectly valid option if I get a dataset with no > outliers to train on. > > Now my question is what then is the use case in which we can use > StreamingKMeans ? In the above scenario we use batch KMeans in training > phase while we just compute the distance in the prediction phase. And how > do we address the scenario where we have only one stream of data available ? > > regards. > > On Sun, 20 Nov 2016 at 6:07 AM, ayan guha wrote: > >> Here are 2 concerns I would have with the design (This discussion is >> mostly to validate my own understanding) >> >> 1. if you have outliers "before" running k-means, aren't your centroids >> get skewed? In other word, outliers by themselves may bias the cluster >> evaluation, isn't it? >> 2. Typically microbatches are small, like 3 sec in your case. in this >> window you may not have enough data to run any statistically sigficant >> operation, can you? >> >> My approach would have been: Run K-means on data without outliers (in >> batch mode). Determine the model, ie centroids in case of kmeans. Then load >> the model in your streaming app and just apply "outlier detection" >> function, which takes the form of >> >> def detectOutlier(model,data): >> /// your code, like mean distance etc >> return T or F >> >> In response to your point about "alternet set of data", I would assume >> you would accumulate the data you are receiving from streaming over few >> weeks or months before running offline training. >> >> Am I missing something? >> >> On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh < >> ghosh.debas...@gmail.com> wrote: >> >> Looking for alternative suggestions in case where we have 1 continuous >> stream of data. Offline training and online prediction can be one option if >> we can have an alternate set of data to train. But if it's one single >> stream you don't have separate sets for training or cross validation. >> >> So whatever data u get in each micro batch, train on them and u get the >> cluster centroids from the model. Then apply some heuristics like mean >> distance from centroid and detect outliers. So for every microbatch u get >> the outliers based on the model and u can control forgetfulness of the >> model through the decay factor that u specify for StramingKMeans. >> >> Suggestions ? >> >> regards. >> >> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha wrote: >> >> Curious why do you want to train your models every 3 secs? >> On 20 Nov 2016 06:25, "Debasish Ghosh" wrote: >> >> Thanks a lot
Kafka direct approach,App UI shows wrong input rate
Hello, I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1 (Scala 2.11). I read data from Kafka with the direct approach. The complete infrastructure runs on Google Container Engine. I wonder why the corresponding application UI says the input rate is zero records per second. This is definitely wrong. I checked it while I printed out the incoming records to the driver console. All other metrics seem to be correct (at least they are realistic). What is going on here? Do you have any idea? Thanks for you help. Julian
Re: Want to test spark-sql-kafka but get unresolved dependency error
Okay, thank you! Can you say, when this feature will be released? 2016-10-13 16:29 GMT+02:00 Cody Koeninger: > As Sean said, it's unreleased. If you want to try it out, build spark > > http://spark.apache.org/docs/latest/building-spark.html > > The easiest way to include the jar is probably to use mvn install to > put it in your local repository, then link it in your application's > mvn or sbt build file as described in the docs you linked. > > > On Thu, Oct 13, 2016 at 3:24 AM, JayKay > wrote: > > I want to work with the Kafka integration for structured streaming. I use > > Spark version 2.0.0. and I start the spark-shell with: > > > > spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0 > > > > As described here: > > https://github.com/apache/spark/blob/master/docs/ > structured-streaming-kafka-integration.md > > > > But I get a unresolved dependency error ("unresolved dependency: > > org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it > seems > > not to be available via maven or spark-packages. > > > > How can I accesss this package? Or am I doing something wrong/missing? > > > > Thank you for you help. > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get- > unresolved-dependency-error-tp27891.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: Sharing object/state accross transformations
Yes, but what they do is to only add new elements to a state which is passed as parameter. But my problem is that my "counter" (the hyperloglog object) comes from outside and is not passed to the function. So i have to track the state of this "external" hll object accross the whole lifecycle of the stream. And that doesn't seem to work just like that. Spark copies all nessecary objects from the enclosing scope to all the worker nodes, doesn't it? So I need a mechanism to use one single hyperloglog object and share its state accross the entire application... Sorry if the problem wasn't clear. 2015-12-02 15:03 GMT+01:00 Ted Yu: > >> Have you taken a look >> at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ? >> >> especially testUpdateStateByKeyWithInitial() >> >> Cheers >> >> On Wed, Dec 2, 2015 at 2:54 AM, JayKay >> wrote: >> >>> I'm new to Apache Spark and an absolute beginner. I'm playing around with >>> Spark Streaming (API version 1.5.1) in Java and want to implement a >>> prototype which uses HyperLogLog to estimate distinct elements. I use the >>> stream-lib from clearspring (https://github.com/addthis/stream-lib). >>> >>> I planned to use updateStateByKey to hold a global state over all events. >>> The problem is that for every call of the specified function, my HLL >>> returns >>> a 1 (it seems to use a new instance of my HLL object every time). Same >>> problem occurs with a simple, global integer variable which I tried to >>> increment in every function call. This also has always the initial value >>> in >>> it. >>> >>> This is a code snippet where I define the update function: >>> >>> Function2 , Optional, Optional> >>> hllCountFunction >>> = new Function2 , Optional, Optional>() { >>> public Optional call(List values, Optional >>> state) >>> throws Exception { >>> values.stream().forEach(value -> hll.offer(value)); >>> long newState = state.isPresent() ? hll.cardinality() : >>> 0; >>> return Optional.of(newState); >>> } >>> }; >>> >>> >>> And this is the snippet how I use the function: >>> >>> JavaPairDStream hllCounts = fullvisitorids.mapToPair(new >>> PairFunction () { >>> public Tuple2 call(String value) { >>> return new Tuple2 ("key", value); >>> } >>> }).updateStateByKey(hllCountFunction); >>> >>> After a lot of research I found the concept of Accumulators. Do I need to >>> specify a custom Accumulator by extending the Accumulator class (in >>> Java)? I >>> also read that for transformations this only should be used for debugging >>> purposes... >>> >>> So how can I achive to use one global defined HLL-object in a spark >>> stream >>> transformation? I also tried to implement a custom Accumulator but this >>> also >>> failed because I don't get how to use the AccumulableParam interface. I >>> implemented the Accumulator and overwrote the add and value methods. But >>> what do I have to do in the AccumulableParam with addAccumulator, >>> addInPlace >>> and zero? >>> >>> Thanks in advance for your help and your advice! >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >