[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17259399#comment-17259399 ] Jungtaek Lim edited comment on SPARK-33635 at 1/6/21, 4:18 AM: --- I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) caused performance regression (though the patch itself is doing the right thing). {code} private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { if (!_consumer.isDefined) { retrieveConsumer() } require(_consumer.isDefined, "Consumer must be defined") if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, _consumer.get.kafkaParamsWithSecurity, _consumer.get.clusterConfig)) { logDebug("Cached consumer uses an old delegation token, invalidating.") releaseConsumer() consumerPool.invalidateKey(cacheKey) fetchedDataPool.invalidate(cacheKey) retrieveConsumer() } _consumer.get } {code} {code} def needTokenUpdate( sparkConf: SparkConf, params: ju.Map[String, Object], clusterConfig: Option[KafkaTokenClusterConf]): Boolean = { if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") && clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { logDebug("Delegation token used by connector, checking if uses the latest token.") val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] getTokenJaasParams(clusterConfig.get) != connectorJaasParams } else { false } } {code} {code} def isServiceEnabled(sparkConf: SparkConf, serviceName: String): Boolean = { val key = providerEnabledConfig.format(serviceName) deprecatedProviderEnabledConfigs.foreach { pattern => val deprecatedKey = pattern.format(serviceName) if (sparkConf.contains(deprecatedKey)) { logWarning(s"${deprecatedKey} is deprecated. Please use ${key} instead.") } } val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern => sparkConf .getOption(pattern.format(serviceName)) .map(_.toBoolean) .getOrElse(true) } sparkConf .getOption(key) .map(_.toBoolean) .getOrElse(isEnabledDeprecated) } {code} With my test data and default config, Spark pulled 500 records per a poll from Kafka, which ended up "10,280,000" calls to get() which always calls getOrRetrieveConsumer(). A single call of KafkaTokenUtil.needTokenUpdate() wouldn't add significant overhead, but 10,000,000 calls make a significant difference. Assuming the case where delegation token is not applied, HadoopDelegationTokenManager.isServiceEnabled is the culprit on such huge overhead. We could probably resolve the issue via short-term solution & long-term solution. * short-term solution: change the order of check in needTokenUpdate, so that the performance hit is only affected when using delegation token. I'll raise a PR shortly. * long-term solution(s): 1) optimize HadoopDelegationTokenManager.isServiceEnabled 2) find a way to reduce the occurrence of checking necessarily of token update. Note that even with short-term solution, a slight performance hit is observed as it still does more things on the code path compared to Spark 2.4. Though I'd ignore it if it affects slightly, like less than 1%, or even slightly higher but the code addition is mandatory. was (Author: kabhwan): I've spent some time to trace the issue, and noticed SPARK-29054 (+SPARK-30495) caused performance regression (though the patch itself is doing the right thing). {code} private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { if (!_consumer.isDefined) { retrieveConsumer() } require(_consumer.isDefined, "Consumer must be defined") if (KafkaTokenUtil.needTokenUpdate(SparkEnv.get.conf, _consumer.get.kafkaParamsWithSecurity, _consumer.get.clusterConfig)) { logDebug("Cached consumer uses an old delegation token, invalidating.") releaseConsumer() consumerPool.invalidateKey(cacheKey) fetchedDataPool.invalidate(cacheKey) retrieveConsumer() } _consumer.get } {code} {code} def needTokenUpdate( sparkConf: SparkConf, params: ju.Map[String, Object], clusterConfig: Option[KafkaTokenClusterConf]): Boolean = { if (HadoopDelegationTokenManager.isServiceEnabled(sparkConf, "kafka") && clusterConfig.isDefined && params.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) { logDebug("Delegation token used by connector, checking if uses the latest token.") val connectorJaasParams = params.get(SaslConfigs.SASL_JAAS_CONFIG).asInstanceOf[String] getTokenJaasParams(clusterConfig.get) != connectorJaasParams } else { false } } {code} {code} def isServiceEnabled(sparkConf: SparkConf,
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046 ] David Wyles edited comment on SPARK-33635 at 12/29/20, 8:48 PM: [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from spark 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. [event logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing] So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case. Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() was (Author: david.wyles): [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from sprak 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. [event logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing] So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case. Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046 ] David Wyles edited comment on SPARK-33635 at 12/29/20, 4:54 PM: [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from sprak 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. [event logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing] So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case. Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() was (Author: david.wyles): [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from sprak 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. [event logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing] So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case (In my production use case I'm just writing to parquet files in hdfs - which is where I noticed the degredation in performant). Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17256046#comment-17256046 ] David Wyles edited comment on SPARK-33635 at 12/29/20, 4:34 PM: [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from sprak 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. [event logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing] So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case (In my production use case I'm just writing to parquet files in hdfs - which is where I noticed the degredation in performant). Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() was (Author: david.wyles): [~gsomogyi] I now have my results. I was so unhappy about these results I ran all the tests again, the only thing that changed between them is the version of spark running on the cluster, everything else was static - the data input from kafka was an unchanging static set of data. Input-> *672733262* rows +*Spark 2.4.5*:+ *440* seconds - *1,528,939* rows per second. +*Spark 3.0.1*:+ *990* seconds - *679,528* rows per seconds. These are multiple runs (I even took the best from sprak 3.0.1) I also captured the event logs between these two versions of spark - should anyone find them useful. So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster in this test case (In my production use case I'm just writing to parquet files in hdfs - which is where I noticed the degredation in performant). Is Spark SQL reading the source data twice, just as it would if there was a "order by" in the query? Sample code used: val spark = SparkSession.builder.appName("Kafka Read Performance") .config("spark.executor.memory","16g") .config("spark.cores.max", "10") .config("spark.eventLog.enabled","true") .config("spark.eventLog.dir","file:///tmp/spark-events") .config("spark.eventLog.overwrite","true") .getOrCreate() import spark.implicits._ val *startTime* = System.nanoTime() val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("subscribe", config.inTopic) .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .option("failOnDataLoss","false") .load() df .write .format("kafka") .option("kafka.bootstrap.servers", config.brokers) .option("topic", config.outTopic) .mode(SaveMode.Append) .save() val *endTime* = System.nanoTime() val elapsedSecs = (endTime - startTime) / 1E9 // static input sample was used, fixed row count. println(s"Took $elapsedSecs secs") spark.stop() > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). >
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875 ] David Wyles edited comment on SPARK-33635 at 12/11/20, 12:12 PM: - I'll give all those a go and get back to you. The collect in this test case is only 13 items of data after the group by - so I know thats not going to impact it. But I can modify it to just read and write to kafka. was (Author: david.wyles): I'll give all those a go and get back to you. The collect in this test case is only 13 items of data after the group by - so I know thats not going to impact it. But I can modify it to just read and write to kafka. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875 ] David Wyles edited comment on SPARK-33635 at 12/11/20, 12:11 PM: - I'll give all those a go and get back to you. The collect in this test case is only 13 items of data - so I know thats not going to impact it. was (Author: david.wyles): I'll give all those a go and get back to you. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247875#comment-17247875 ] David Wyles edited comment on SPARK-33635 at 12/11/20, 12:11 PM: - I'll give all those a go and get back to you. The collect in this test case is only 13 items of data after the group by - so I know thats not going to impact it. But I can modify it to just read and write to kafka. was (Author: david.wyles): I'll give all those a go and get back to you. The collect in this test case is only 13 items of data - so I know thats not going to impact it. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246643#comment-17246643 ] Gabor Somogyi edited comment on SPARK-33635 at 12/9/20, 4:39 PM: - {quote}I no longer believe this is a true regression in performance, I now think that 2.4.5 was "cheating". {quote} If you mean by cheating Spark uses one consumer from multiple threads then the answer is no. Kafka consumer is strictly forbidden to use from multiple threads. If such thing happens then Kafka realizes it and exception will be thrown which will stop the query immediately. was (Author: gsomogyi): {quote}I no longer believe this is a true regression in performance, I now think that 2.4.5 was "cheating". {quote} If you mean by cheating Spark uses one consumer from multiple threads then the answer is no. Kafka consumer is strictly forbidden to use from multiple threads. If such thing happens then Kafka realizes it and exception will be throws which will stop the query immediately. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 4:41 PM: --- Apart from all the cached consumer changes and other things on kafka-010-sql there was the kafka client change, it went from 2.0.0 to 2.4.1. This I can test, using just kafka libraries. I'll give it a go and see what the outcome is. was (Author: david.wyles): The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can see) so the only thing changing directly in the library is the Kafka version used. Kafka client went from 2.0.0 to 2.4.1. So would you suspect a kafka client library issue with my 2.3.1 kafka cluster? This I can test, using just kafka libraries. I'll give it a go and see what the outcome is. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 4:33 PM: --- The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can see) so the only thing changing directly in the library is the Kafka version used. Kafka client went from 2.0.0 to 2.4.1. So would you suspect a kafka client library issue with my 2.3.1 kafka cluster? This I can test, using just kafka libraries. I'll give it a go and see what the outcome is. was (Author: david.wyles): The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can see) so the only thing changing directly in the library is the Kafka version used. Kafka client went from 2.0.0 to 2.4.1. So would you suspect a kafka client library issue with my 2.3.1 kafka cluster? > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245329#comment-17245329 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 4:32 PM: --- The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can see) so the only thing changing directly in the library is the Kafka version used. Kafka client went from 2.0.0 to 2.4.1. So would you suspect a kafka client library issue with my 2.3.1 kafka cluster? was (Author: david.wyles): The diffs on that library between 2.4.5 and 3.0.0 are very small (that I can see) so the only thing changing directly in the library is the Kafka version used. So would you suspect a kafka client library issue with my 2.3.1 kafka cluster? > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 4:20 PM: --- Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. was (Author: david.wyles): Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. Like I said, 3.0 was in development for a long time, the changes in that library span over a year - its difficult to narrow down what specific change it was, without going though every single change iteration over that time. I would have thought the spark team might be concerned that their kafka reader is taking a performance hit. Is there an alternative to that library, all your docs say to use that library. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 4:00 PM: --- Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. Like I said, 3.0 was in development for a long time, the changes in that library span over a year - its difficult to narrow down what specific change it was, without going though every single change iteration over that time. I would have thought the spark team might be concerned that their kafka reader is taking a performance hit. Is there an alternative to that library, all your docs say to use that library. was (Author: david.wyles): Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. Like I said, 3.0 was in development for a long time, the changes in that library span over a year - its difficult to narrow down what specific change it was, without going though every single change iteration over that time. I would have thought the spark team might be concerned that their kafka reader is taking a performance hit. Is there an alternative to that library, all you docs say to use that library. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33635) Performance regression in Kafka read
[ https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17245304#comment-17245304 ] David Wyles edited comment on SPARK-33635 at 12/7/20, 3:53 PM: --- Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. Like I said, 3.0 was in development for a long time, the changes in that library span over a year - its difficult to narrow down what specific change it was, without going though every single change iteration over that time. I would have thought the spark team might be concerned that their kafka reader is taking a performance hit. Is there an alternative to that library, all you docs say to use that library. was (Author: david.wyles): Fair point, the library I suspect is spark-sql-kafka-0-10, is that covered on these issues. Like I said, 3.0 was in development for a long time, the changes in that library span over a year - its difficult to narrow down what specific change it was, without going though every single change iteration over that time. I would have though the spark team might be concerned that their kafka reader is taking a performance hit. Is there an alternative to that library, all you docs say to use that library. > Performance regression in Kafka read > > > Key: SPARK-33635 > URL: https://issues.apache.org/jira/browse/SPARK-33635 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.0.0, 3.0.1 > Environment: A simple 5 node system. A simple data row of csv data in > kafka, evenly distributed between the partitions. > Open JDK 1.8.0.252 > Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to > a distinct NUMA group) > kafka (v 2.3.1) cluster - 5 nodes (1 broker per node). > Centos 7.7.1908 > 1 topic, 10 partiions, 1 hour queue life > (this is just one of clusters we have, I have tested on all of them and > theyall exhibit the same performance degredation) >Reporter: David Wyles >Priority: Major > > I have observed a slowdown in the reading of data from kafka on all of our > systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1) > I have created a sample project to isolate the problem as much as possible, > with just a read all data from a kafka topic (see > [https://github.com/codegorillauk/spark-kafka-read] ). > With 2.4.5, across multiple runs, > I get a stable read rate of 1,120,000 (1.12 mill) rows per second > With 3.0.0 or 3.0.1, across multiple runs, > I get a stable read rate of 632,000 (0.632 mil) rows per second > The represents a *44% loss in performance*. Which is, a lot. > I have been working though the spark-sql-kafka-0-10 code base, but change for > spark 3 have been ongoing for over a year and its difficult to pin point an > exact change or reason for the degradation. > I am happy to help fix this problem, but will need some assitance as I am > unfamiliar with the spark-sql-kafka-0-10 project. > > A sample of the data my test reads (note: its not parsing csv - this is just > test data) > > 160692180,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine > Ave & 18th St Chicago IL,41.857959,-87.6564270002,AoT Chicago (S) > [C],2017/12/15 00:00:00, -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org