Hi,

I use Spark 2.0.2 and want to do the following:

I extract features in a streaming job and than apply the records to a
k-means model. Some of the features are simple ones which are calculated
directly from the record. But I also have more complex features which
depend on records from a specified time window before. They count how many
connections in the last second were to the same host or service as the
current one. I decided to use the SQL window functions for this.

So I build window specifications:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(
desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").
orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

And a function which is called to extract this features on every batch:

def extractTrafficFeatures(dataset: Dataset[Row]) = {
  dataset
    .withColumn("host_count", count(dataset("plainrecord.ip_
dst")).over(hostCountWindow))
    .withColumn("srv_count", count(dataset("service")).
over(serviceCountWindow))
}

And use this function as follows

stream.map(...).map(...).foreachRDD { rdd =>
  val dataframe = rdd.toDF(featureHeaders: _*).transform(
extractTrafficFeatures(_))
  ...
}

The problem is that this has a very bad performance. A batch needs between
1 and 3 seconds for a average input rate of less than 100 records per
second. I guess it comes from the partitioning, which produces a lot of
shuffling? Is there a better way to calculate these features on the
streaming data? Or am I doing something wrong here?

Thank you for your help.

Reply via email to