[jira] [Updated] (SPARK-22782) Boost speed, use kafka010 consumer kafka
[ https://issues.apache.org/jira/browse/SPARK-22782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] licun updated SPARK-22782: -- Description: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 1. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {code:scala} private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator } {code} The log: was: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 1. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {code:scala} private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator } {code} > Boost speed, use kafka010 consumer kafka > > > Key: SPARK-22782 > URL: https://issues.apache.org/jira/browse/SPARK-22782 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0, 2.2.0 > Environment: kafka-version: 0.10 > spark-version: 2.2.0 > schedule spark on yarn >Reporter: licun >Priority: Critical > > We use spark structured streaming to consumer kafka, but we find the > consumer speed is too slow compare spark streaming . we set kafka > "maxOffsetsPerTrigger": 1. >By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we > found the following situation: > >The debug code: > > {code:scala} > private def poll(pollTimeoutMs: Long): Unit = { > val startTime = System.currentTimeMillis() > val p = consumer.poll(pollTimeoutMs) > val r = p.records(topicPartition) > val endTime = System.currentTimeMillis() > val delta = endTime - startTime > logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: > ${delta} ms") > fetchedData = r.iterator > } > {code} >The log: > -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22782) Boost speed, use kafka010 consumer kafka
[ https://issues.apache.org/jira/browse/SPARK-22782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] licun updated SPARK-22782: -- Description: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 1. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {code:scala} private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator } {code} was: We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 1. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {color:red}private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator }{color} > Boost speed, use kafka010 consumer kafka > > > Key: SPARK-22782 > URL: https://issues.apache.org/jira/browse/SPARK-22782 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.0, 2.2.0 > Environment: kafka-version: 0.10 > spark-version: 2.2.0 > schedule spark on yarn >Reporter: licun >Priority: Critical > > We use spark structured streaming to consumer kafka, but we find the > consumer speed is too slow compare spark streaming . we set kafka > "maxOffsetsPerTrigger": 1. >By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we > found the following situation: > >The debug code: > > {code:scala} > private def poll(pollTimeoutMs: Long): Unit = { > val startTime = System.currentTimeMillis() > val p = consumer.poll(pollTimeoutMs) > val r = p.records(topicPartition) > val endTime = System.currentTimeMillis() > val delta = endTime - startTime > logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: > ${delta} ms") > fetchedData = r.iterator > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22782) Boost speed, use kafka010 consumer kafka
licun created SPARK-22782: - Summary: Boost speed, use kafka010 consumer kafka Key: SPARK-22782 URL: https://issues.apache.org/jira/browse/SPARK-22782 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.2.0, 2.1.0 Environment: kafka-version: 0.10 spark-version: 2.2.0 schedule spark on yarn Reporter: licun Priority: Critical We use spark structured streaming to consumer kafka, but we find the consumer speed is too slow compare spark streaming . we set kafka "maxOffsetsPerTrigger": 1. By adding debug log in CachedKafkaConsumer.scala (kafka-0-10-sql), we found the following situation: The debug code: {color:red}private def poll(pollTimeoutMs: Long): Unit = { val startTime = System.currentTimeMillis() val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) val endTime = System.currentTimeMillis() val delta = endTime - startTime logError(s"Polled $groupId ${p.partitions()}, Size:${r.size}, spend: ${delta} ms") fetchedData = r.iterator }{color} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org