Hi, I use Spark 2.0.2 and want to do the following:
I extract features in a streaming job and than apply the records to a k-means model. Some of the features are simple ones which are calculated directly from the record. But I also have more complex features which depend on records from a specified time window before. They count how many connections in the last second were to the same host or service as the current one. I decided to use the SQL window functions for this. So I build window specifications: val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy( desc("timestamp")).rangeBetween(-1L, 0L) val serviceCountWindow = Window.partitionBy("service"). orderBy(desc("timestamp")).rangeBetween(-1L, 0L) And a function which is called to extract this features on every batch: def extractTrafficFeatures(dataset: Dataset[Row]) = { dataset .withColumn("host_count", count(dataset("plainrecord.ip_ dst")).over(hostCountWindow)) .withColumn("srv_count", count(dataset("service")). over(serviceCountWindow)) } And use this function as follows stream.map(...).map(...).foreachRDD { rdd => val dataframe = rdd.toDF(featureHeaders: _*).transform( extractTrafficFeatures(_)) ... } The problem is that this has a very bad performance. A batch needs between 1 and 3 seconds for a average input rate of less than 100 records per second. I guess it comes from the partitioning, which produces a lot of shuffling? Is there a better way to calculate these features on the streaming data? Or am I doing something wrong here? Thank you for your help.