[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2021-01-05 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259399#comment-17259399
 ] 

Jungtaek Lim edited comment on SPARK-33635 at 1/6/21, 4:18 AM:
---

I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) 
caused performance regression (though the patch itself is doing the right 
thing).

{code}
  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
if (!_consumer.isDefined) {
  retrieveConsumer()
}
require(_consumer.isDefined, "Consumer must be defined")
if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, 
_consumer.get.kafkaParamsWithSecurity,
_consumer.get.clusterConfig)) {
  logDebug("Cached consumer uses an old delegation token, invalidating.")
  releaseConsumer()
  consumerPool.invalidateKey(cacheKey)
  fetchedDataPool.invalidate(cacheKey)
  retrieveConsumer()
}
_consumer.get
  }
{code}

{code}
  def needTokenUpdate(
  sparkConf: SparkConf,
  params: ju.Map[String, Object],
  clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
clusterConfig.isDefined && 
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
  logDebug("Delegation token used by connector, checking if uses the latest 
token.")
  val connectorJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
  getTokenJaasParams(clusterConfig.get) != connectorJaasParams
} else {
  false
}
  }
{code}

{code}
  def isServiceEnabled(sparkConf: SparkConf, serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)

deprecatedProviderEnabledConfigs.foreach { pattern =>
  val deprecatedKey = pattern.format(serviceName)
  if (sparkConf.contains(deprecatedKey)) {
logWarning(s"${deprecatedKey} is deprecated.  Please use ${key} 
instead.")
  }
}

val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern 
=>
  sparkConf
.getOption(pattern.format(serviceName))
.map(_.toBoolean)
.getOrElse(true)
}

sparkConf
  .getOption(key)
  .map(_.toBoolean)
  .getOrElse(isEnabledDeprecated)
  }
{code}

With my test data and default config, Spark pulled 500 records per a poll from 
Kafka, which ended up "10,280,000" calls to get() which always calls 
getOrRetrieveConsumer(). A single call of KafkaTokenUtil.needTokenUpdate() 
wouldn't add significant overhead, but 10,000,000 calls make a significant 
difference. Assuming the case where delegation token is not applied, 
HadoopDelegationTokenManager.isServiceEnabled is the culprit on such huge 
overhead.

We could probably resolve the issue via short-term solution & long-term 
solution.

* short-term solution: change the order of check in needTokenUpdate, so that 
the performance hit is only affected when using delegation token. I'll raise a 
PR shortly.
* long-term solution(s): 1) optimize 
HadoopDelegationTokenManager.isServiceEnabled 2) find a way to reduce the 
occurrence of checking necessarily of token update.

Note that even with short-term solution, a slight performance hit is observed 
as it still does more things on the code path compared to Spark 2.4. Though I'd 
ignore it if it affects slightly, like less than 1%, or even slightly higher 
but the code addition is mandatory.


was (Author: kabhwan):
I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) 
caused performance regression (though the patch itself is doing the right 
thing).

{code}
  private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
if (!_consumer.isDefined) {
  retrieveConsumer()
}
require(_consumer.isDefined, "Consumer must be defined")
if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, 
_consumer.get.kafkaParamsWithSecurity,
_consumer.get.clusterConfig)) {
  logDebug("Cached consumer uses an old delegation token, invalidating.")
  releaseConsumer()
  consumerPool.invalidateKey(cacheKey)
  fetchedDataPool.invalidate(cacheKey)
  retrieveConsumer()
}
_consumer.get
  }
{code}

{code}
  def needTokenUpdate(
  sparkConf: SparkConf,
  params: ju.Map[String, Object],
  clusterConfig: Option[KafkaTokenClusterConf]): Boolean = {
if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") &&
clusterConfig.isDefined && 
params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) {
  logDebug("Delegation token used by connector, checking if uses the latest 
token.")
  val connectorJaasParams = 
params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String]
  getTokenJaasParams(clusterConfig.get) != connectorJaasParams
} else {
  false
}
  }
{code}

{code}
  def isServiceEnabled(sparkConf: SparkConf, 

[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-29 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046
 ] 

David Wyles edited comment on SPARK-33635 at 12/29/20, 8:48 PM:


[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from spark 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case.

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 


was (Author: david.wyles):
[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case.

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall 

[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-29 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046
 ] 

David Wyles edited comment on SPARK-33635 at 12/29/20, 4:54 PM:


[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case.

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 


was (Author: david.wyles):
[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case (In my production use case I'm just writing to parquet files 
in hdfs - which is where I noticed the degredation in performant).

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908

[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-29 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046
 ] 

David Wyles edited comment on SPARK-33635 at 12/29/20, 4:34 PM:


[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case (In my production use case I'm just writing to parquet files 
in hdfs - which is where I noticed the degredation in performant).

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 


was (Author: david.wyles):
[~gsomogyi] I now have my results.
I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case (In my production use case I'm just writing to parquet files 
in hdfs - which is where I noticed the degredation in performant).

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:



val spark =
  SparkSession.builder.appName("Kafka Read Performance")
    .config("spark.executor.memory","16g")
    .config("spark.cores.max", "10")
    .config("spark.eventLog.enabled","true")
    .config("spark.eventLog.dir","file:///tmp/spark-events")
    .config("spark.eventLog.overwrite","true")
   .getOrCreate()

 import spark.implicits._

val *startTime* = System.nanoTime()

 val df = 
  spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", config.brokers)
    .option("subscribe", config.inTopic)
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .option("failOnDataLoss","false")
    .load()

df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", config.brokers)
  .option("topic", config.outTopic)
  .mode(SaveMode.Append)
  .save()

val *endTime* = System.nanoTime()

 val elapsedSecs = (endTime - startTime) / 1E9

 // static input sample was used, fixed row count.

 println(s"Took $elapsedSecs secs")
 spark.stop()

 

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> 

[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-11 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875
 ] 

David Wyles edited comment on SPARK-33635 at 12/11/20, 12:12 PM:
-

I'll give all those a go and get back to you.

 

The collect in this test case is only 13 items of data after the group by - so 
I know thats not going to impact it.

But I can modify it to just read and write to kafka.


was (Author: david.wyles):
I'll give all those a go and get back to you.

 

The collect in this test case is only 13 items of data after the group by - so 
I know thats not going to impact it.

But I can modify it to just read and write to kafka.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-11 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875
 ] 

David Wyles edited comment on SPARK-33635 at 12/11/20, 12:11 PM:
-

I'll give all those a go and get back to you.

 

The collect in this test case is only 13 items of data - so I know thats not 
going to impact it.


was (Author: david.wyles):
I'll give all those a go and get back to you.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-11 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875
 ] 

David Wyles edited comment on SPARK-33635 at 12/11/20, 12:11 PM:
-

I'll give all those a go and get back to you.

 

The collect in this test case is only 13 items of data after the group by - so 
I know thats not going to impact it.

But I can modify it to just read and write to kafka.


was (Author: david.wyles):
I'll give all those a go and get back to you.

 

The collect in this test case is only 13 items of data - so I know thats not 
going to impact it.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-09 Thread Gabor Somogyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246643#comment-17246643
 ] 

Gabor Somogyi edited comment on SPARK-33635 at 12/9/20, 4:39 PM:
-

{quote}I no longer believe this is a true regression in performance, I now 
think that 2.4.5 was "cheating".
{quote}
If you mean by cheating Spark uses one consumer from multiple threads then the 
answer is no. Kafka consumer is strictly forbidden to use from multiple threads.
 If such thing happens then Kafka realizes it and exception will be thrown 
which will stop the query immediately.


was (Author: gsomogyi):
{quote}I no longer believe this is a true regression in performance, I now 
think that 2.4.5 was "cheating".
{quote}
If you mean by cheating Spark uses one consumer from multiple threads then the 
answer is no. Kafka consumer is strictly forbidden to use from multiple threads.
 If such thing happens then Kafka realizes it and exception will be throws 
which will stop the query immediately.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 4:41 PM:
---

Apart from all the cached consumer changes and other things on kafka-010-sql 
there was the kafka client change, it went from 2.0.0 to 2.4.1.

This I can test, using just kafka libraries. I'll give it a go and see what the 
outcome is.


was (Author: david.wyles):
The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can 
see) so the only thing changing directly in the library is the Kafka version 
used. 

Kafka client went from 2.0.0 to 2.4.1.

So would you suspect a kafka client library issue with my 2.3.1 kafka cluster?

This I can test, using just kafka libraries. I'll give it a go and see what the 
outcome is.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 4:33 PM:
---

The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can 
see) so the only thing changing directly in the library is the Kafka version 
used. 

Kafka client went from 2.0.0 to 2.4.1.

So would you suspect a kafka client library issue with my 2.3.1 kafka cluster?

This I can test, using just kafka libraries. I'll give it a go and see what the 
outcome is.


was (Author: david.wyles):
The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can 
see) so the only thing changing directly in the library is the Kafka version 
used. 

Kafka client went from 2.0.0 to 2.4.1.

So would you suspect a kafka client library issue with my 2.3.1 kafka cluster?

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 4:32 PM:
---

The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can 
see) so the only thing changing directly in the library is the Kafka version 
used. 

Kafka client went from 2.0.0 to 2.4.1.

So would you suspect a kafka client library issue with my 2.3.1 kafka cluster?


was (Author: david.wyles):
The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can 
see) so the only thing changing directly in the library is the Kafka version 
used. 

So would you suspect a kafka client library issue with my 2.3.1 kafka cluster?

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 4:20 PM:
---

Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.


was (Author: david.wyles):
Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.

Like I said, 3.0 was in development for a long time, the changes in that 
library span over a year - its difficult to narrow down what specific change it 
was, without going though every single change iteration over that time.

I would have thought the spark team might be concerned that their kafka reader 
is taking a performance hit.

Is there an alternative to that library, all your docs say to use that library.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 4:00 PM:
---

Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.

Like I said, 3.0 was in development for a long time, the changes in that 
library span over a year - its difficult to narrow down what specific change it 
was, without going though every single change iteration over that time.

I would have thought the spark team might be concerned that their kafka reader 
is taking a performance hit.

Is there an alternative to that library, all your docs say to use that library.


was (Author: david.wyles):
Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.

Like I said, 3.0 was in development for a long time, the changes in that 
library span over a year - its difficult to narrow down what specific change it 
was, without going though every single change iteration over that time.

I would have thought the spark team might be concerned that their kafka reader 
is taking a performance hit.

Is there an alternative to that library, all you docs say to use that library.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read

2020-12-07 Thread David Wyles (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304
 ] 

David Wyles edited comment on SPARK-33635 at 12/7/20, 3:53 PM:
---

Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.

Like I said, 3.0 was in development for a long time, the changes in that 
library span over a year - its difficult to narrow down what specific change it 
was, without going though every single change iteration over that time.

I would have thought the spark team might be concerned that their kafka reader 
is taking a performance hit.

Is there an alternative to that library, all you docs say to use that library.


was (Author: david.wyles):
Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on 
these issues.

Like I said, 3.0 was in development for a long time, the changes in that 
library span over a year - its difficult to narrow down what specific change it 
was, without going though every single change iteration over that time.

I would have though the spark team might be concerned that their kafka reader 
is taking a performance hit.

Is there an alternative to that library, all you docs say to use that library.

> Performance regression in Kafka read
> 
>
> Key: SPARK-33635
> URL: https://issues.apache.org/jira/browse/SPARK-33635
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.0.0, 3.0.1
> Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>Reporter: David Wyles
>Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org