Re: [MLlib] kmeans random initialization, same seed every time

2017-03-14 Thread Julian Keppel
I'm sorry, I missed some important informations. I use Spark version 2.0.2
in Scala 2.11.8.

2017-03-14 13:44 GMT+01:00 Julian Keppel <juliankeppel1...@gmail.com>:

> Hi everybody,
>
> I make some experiments with the Spark kmeans implementation of the new
> DataFrame-API. I compare clustering results of different runs with
> different parameters. I recognized that for random initialization mode, the
> seed value is the same every time. How is it calculated? In my
> understanding the seed should be random if it is not provided by the user.
>
> Thank you for you help.
>
> Julian
>


[MLlib] kmeans random initialization, same seed every time

2017-03-14 Thread Julian Keppel
Hi everybody,

I make some experiments with the Spark kmeans implementation of the new
DataFrame-API. I compare clustering results of different runs with
different parameters. I recognized that for random initialization mode, the
seed value is the same every time. How is it calculated? In my
understanding the seed should be random if it is not provided by the user.

Thank you for you help.

Julian


[Spark DataFrames/Streaming]: Bad performance with window function in streaming job

2017-01-16 Thread Julian Keppel
Hi,

I use Spark 2.0.2 and want to do the following:

I extract features in a streaming job and than apply the records to a
k-means model. Some of the features are simple ones which are calculated
directly from the record. But I also have more complex features which
depend on records from a specified time window before. They count how many
connections in the last second were to the same host or service as the
current one. I decided to use the SQL window functions for this.

So I build window specifications:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(
desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").
orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

And a function which is called to extract this features on every batch:

def extractTrafficFeatures(dataset: Dataset[Row]) = {
  dataset
.withColumn("host_count", count(dataset("plainrecord.ip_
dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).
over(serviceCountWindow))
}

And use this function as follows

stream.map(...).map(...).foreachRDD { rdd =>
  val dataframe = rdd.toDF(featureHeaders: _*).transform(
extractTrafficFeatures(_))
  ...
}

The problem is that this has a very bad performance. A batch needs between
1 and 3 seconds for a average input rate of less than 100 records per
second. I guess it comes from the partitioning, which produces a lot of
shuffling? Is there a better way to calculate these features on the
streaming data? Or am I doing something wrong here?

Thank you for your help.


Re: Kafka direct approach,App UI shows wrong input rate

2016-11-22 Thread Julian Keppel
Oh, sorry. I made a mistake... It's spark version 2.0.1, not 2.0.2.

When I wrote the initial message I built my app with 2.0.2 and deployed it
on a cluster with 2.0.1. So I thought this could be the problem. But now I
changed it and build my app with 2.0.1 but the problem still remains.

2016-11-19 18:06 GMT+01:00 Cody Koeninger <c...@koeninger.org>:

> There have definitely been issues with UI reporting for the direct
> stream in the past, but I'm not able to reproduce this with 2.0.2 and
> 0.8.  See below:
>
> https://i.imgsafe.org/086019ae57.png
>
>
>
> On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel
> <juliankeppel1...@gmail.com> wrote:
> > Hello,
> >
> > I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is
> 0.10.0.1
> > (Scala 2.11). I read data from Kafka with the direct approach. The
> complete
> > infrastructure runs on Google Container Engine.
> >
> > I wonder why the corresponding application UI says the input rate is zero
> > records per second. This is definitely wrong. I checked it while I
> printed
> > out the incoming records to the driver console. All other metrics seem
> to be
> > correct (at least they are realistic).
> >
> > What is going on here? Do you have any idea? Thanks for you help.
> >
> > Julian
>


Re: using StreamingKMeans

2016-11-21 Thread Julian Keppel
I do research in anomaly detection with methods of machine learning at the
moment. And currently I do kmeans clustering, too in an offline learning
setting. In further work we want to compare the two paradigms of offline
and online learning. I would like to share some thoughts on this
disscussion.

My offline setting is exactly what Guha Ayan explained: We collect data for
training and test over few days/weeks/month and train the model
periodically, lets say once a week for example. Please note that kmeans is
unsupervised, so it doesn't have any idea of what you data is about and
what could be "normal" or an anomaly. So in my opinion the training dataset
has to represent a state in which everything occors, the normal datapoints
and also as any anomalies. Referring to "Anomaly Detection: A Survey" from
Varun Chandola et. al. from 2009 there are different methods of
interpreting the results than. As an example: "Normal data instances belong
to large and dense clusters, while anomalies either belong to small or
sparse clusters". So potential anomalies have to be present in you
trainingdataset, I think.

The online learning setting is meant to adapt rapid changes in you
environment. So for example, if you are analyzing network traffinc, and you
add a new service which produces a lot of traffic (a lot of users use the
new service), than in an offline setting where you learn just once a week,
your new service may produce a false alarm whereas the online model would
adapt these changes (depending on the configured forgetfullness). There are
use cases where you have a very dynamic environment (for example flight
ticket prices), where you need to adapt you model rapidly (see for example
here: https://youtu.be/wyfTjd9z1sY).

2016-11-20 2:11 GMT+01:00 Debasish Ghosh :

> I share both the concerns that u have expressed. And as I mentioned in my
> earlier mail, offline (batch) training is an option if I get a dataset
> without outliers. In that case I can train and have a model. I find the
> model parameters, which will be the mean distance to the centroid. Note in
> training I will have only 1 cluster as it's only normal data (no outlier).
>
> I can now pass these parameters to the prediction phase which can work on
> streaming data. In the prediction phase I just compute the distance to
> centroid for each point and flag the violating ones as outliers.
>
> This looks like a perfectly valid option if I get a dataset with no
> outliers to train on.
>
> Now my question is what then is the use case in which we can use
> StreamingKMeans ? In the above scenario we use batch KMeans in training
> phase while we just compute the distance in the prediction phase. And how
> do we address the scenario where we have only one stream of data available ?
>
> regards.
>
> On Sun, 20 Nov 2016 at 6:07 AM, ayan guha  wrote:
>
>> Here are 2 concerns I would have with the design (This discussion is
>> mostly to validate my own understanding)
>>
>> 1. if you have outliers "before" running k-means, aren't your centroids
>> get skewed? In other word, outliers by themselves may bias the cluster
>> evaluation, isn't it?
>> 2. Typically microbatches are small, like 3 sec in your case. in this
>> window you may not have enough data to run any statistically sigficant
>> operation, can you?
>>
>> My approach would have been: Run K-means on data without outliers (in
>> batch mode). Determine the model, ie centroids in case of kmeans. Then load
>> the model in your streaming app and just apply "outlier detection"
>> function, which takes the form of
>>
>> def detectOutlier(model,data):
>>   /// your code, like mean distance etc
>>   return T or F
>>
>> In response to your point about "alternet set of data", I would assume
>> you would accumulate the data you are receiving from streaming over few
>> weeks or months before running offline training.
>>
>> Am I missing something?
>>
>> On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh <
>> ghosh.debas...@gmail.com> wrote:
>>
>> Looking for alternative suggestions in case where we have 1 continuous
>> stream of data. Offline training and online prediction can be one option if
>> we can have an alternate set of data to train. But if it's one single
>> stream you don't have separate sets for training or cross validation.
>>
>> So whatever data u get in each micro batch, train on them and u get the
>> cluster centroids from the model. Then apply some heuristics like mean
>> distance from centroid and detect outliers. So for every microbatch u get
>> the outliers based on the model and u can control forgetfulness of the
>> model through the decay factor that u specify for StramingKMeans.
>>
>> Suggestions ?
>>
>> regards.
>>
>> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha  wrote:
>>
>> Curious why do you want to train your models every 3 secs?
>> On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:
>>
>> Thanks a lot 

Kafka direct approach,App UI shows wrong input rate

2016-11-18 Thread Julian Keppel
Hello,

I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1
(Scala 2.11). I read data from Kafka with the direct approach. The complete
infrastructure runs on Google Container Engine.

I wonder why the corresponding application UI says the input rate is zero
records per second. This is definitely wrong. I checked it while I printed
out the incoming records to the driver console. All other metrics seem to
be correct (at least they are realistic).

What is going on here? Do you have any idea? Thanks for you help.

Julian


Re: Want to test spark-sql-kafka but get unresolved dependency error

2016-10-14 Thread Julian Keppel
Okay, thank you! Can you say, when this feature will be released?

2016-10-13 16:29 GMT+02:00 Cody Koeninger :

> As Sean said, it's unreleased.  If you want to try it out, build spark
>
> http://spark.apache.org/docs/latest/building-spark.html
>
> The easiest way to include the jar is probably to use mvn install to
> put it in your local repository, then link it in your application's
> mvn or sbt build file as described in the docs you linked.
>
>
> On Thu, Oct 13, 2016 at 3:24 AM, JayKay 
> wrote:
> > I want to work with the Kafka integration for structured streaming. I use
> > Spark version 2.0.0. and I start the spark-shell with:
> >
> > spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.0.0
> >
> > As described here:
> > https://github.com/apache/spark/blob/master/docs/
> structured-streaming-kafka-integration.md
> >
> > But I get a unresolved dependency error ("unresolved dependency:
> > org.apache.spark#spark-sql-kafka-0-10_2.11;2.0.0: not found"). So it
> seems
> > not to be available via maven or spark-packages.
> >
> > How can I accesss this package? Or am I doing something wrong/missing?
> >
> > Thank you for you help.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Want-to-test-spark-sql-kafka-but-get-
> unresolved-dependency-error-tp27891.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: Sharing object/state accross transformations

2015-12-06 Thread Julian Keppel
Yes, but what they do is to only add new elements to a state which is
passed as parameter. But my problem is that my "counter" (the hyperloglog
object) comes from outside and is not passed to the function. So i have to
track the state of this "external" hll object accross the whole lifecycle
of the stream. And that doesn't seem to work just like that. Spark copies
all nessecary objects from the enclosing scope to all the worker nodes,
doesn't it? So I need a mechanism to use one single hyperloglog object and
share its state accross the entire application... Sorry if the problem
wasn't clear.


2015-12-02 15:03 GMT+01:00 Ted Yu :
>
>> Have you taken a look
>> at streaming//src/test/java/org/apache/spark/streaming/JavaAPISuite.java ?
>>
>> especially testUpdateStateByKeyWithInitial()
>>
>> Cheers
>>
>> On Wed, Dec 2, 2015 at 2:54 AM, JayKay 
>> wrote:
>>
>>> I'm new to Apache Spark and an absolute beginner. I'm playing around with
>>> Spark Streaming (API version 1.5.1) in Java and want to implement a
>>> prototype which uses HyperLogLog to estimate distinct elements. I use the
>>> stream-lib from clearspring (https://github.com/addthis/stream-lib).
>>>
>>> I planned to use updateStateByKey to hold a global state over all events.
>>> The problem is that for every call of the specified function, my HLL
>>> returns
>>> a 1 (it seems to use a new instance of my HLL object every time). Same
>>> problem occurs with a simple, global integer variable which I tried to
>>> increment in every function call. This also has always the initial value
>>> in
>>> it.
>>>
>>> This is a code snippet where I define the update function:
>>>
>>> Function2, Optional, Optional>
>>> hllCountFunction
>>> = new Function2, Optional, Optional>() {
>>> public Optional call(List values, Optional
>>> state)
>>> throws Exception {
>>> values.stream().forEach(value -> hll.offer(value));
>>> long newState = state.isPresent() ? hll.cardinality() :
>>> 0;
>>> return Optional.of(newState);
>>> }
>>> };
>>>
>>>
>>> And this is the snippet how I use the function:
>>>
>>> JavaPairDStream hllCounts = fullvisitorids.mapToPair(new
>>> PairFunction() {
>>> public Tuple2 call(String value) {
>>> return new Tuple2("key", value);
>>> }
>>> }).updateStateByKey(hllCountFunction);
>>>
>>> After a lot of research I found the concept of Accumulators. Do I need to
>>> specify a custom Accumulator by extending the Accumulator class (in
>>> Java)? I
>>> also read that for transformations this only should be used for debugging
>>> purposes...
>>>
>>> So how can I achive to use one global defined HLL-object in a spark
>>> stream
>>> transformation? I also tried to implement a custom Accumulator but this
>>> also
>>> failed because I don't get how to use the AccumulableParam interface. I
>>> implemented the Accumulator and overwrote the add and value methods. But
>>> what do I have to do in the AccumulableParam with addAccumulator,
>>> addInPlace
>>> and zero?
>>>
>>> Thanks in advance for your help and your advice!
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sharing-object-state-accross-transformations-tp25544.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>