[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2021-02-16 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285168#comment-17285168
 ] 

Julian commented on SPARK-21453:


Hi, it's been quite my side as we have been migrating HDP clusters for last 9+ 
months. We are now on HDP 3.1.5, but this still only has spark 
2.3.2. The Cloudera releases are always further behind and also then we have 
our own cluster migrations and customer innovations to deal with so we are 
never on latest versions. We are using the older kafka 0.10 libraries still due 
to spark streaming kafka libraries not being available for build of spark 2.3.2 
and kafka 2.0+.
For this case, the issue is still coming on our new cluster... I think this 
error then is likely still related to kafka 0.10 version but I can't test with 
kafka 2+ libraries due to spark streaming libraries not being available for 
2.3.2 at least on HDP 3.1.5.
 
NOTE: The error is identical as before...
 
2021-02-16 10:54:05 WARN SslTransportLayer:166 - Failed to send SSL Close 
message
java.io.IOException: Broken pipe
 at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
 at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47).
 
I still expect a relation to these two kafka jiras (for 0.10 version)...
 
https://issues.apache.org/jira/browse/KAFKA-5649
https://issues.apache.org/jira/browse/KAFKA-3702
 

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2019-11-08 Thread Julian (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969995#comment-16969995
 ] 

Julian commented on SPARK-21453:


As an update, the issue "{color:#172b4d}Failed to send SSL Close 
message{color}" is not causing major issues, as in the spark batches are still 
completing regardless of this failure. 

Importantly, the other error I was getting 
"{color:#172b4d}java.lang.NullPointerException{color}" appears unrelated. I 
have seemingly managed to avoid that error by dropping the kafka.jar / 
kafka-client.jar to the latest 0.11 kafka release instead of 1.0. This seems to 
have resolved that issue for now, so that specific issue appears to be coming 
from the kafka libraries. That error was causing the batches to fail 
periodically.

The SSL Close message error still comes in the logs files every so many hours 
(for all our different streaming flows that are built in a similar way and 
where there is continual load over many hours), so not hard to reproduce.

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2019-02-19 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16771709#comment-16771709
 ] 

Jungtaek Lim commented on SPARK-21453:
--

[~ppanero] [~Julescs0]
I guess it's too late, but I've missed this issue when I crafted my patch on 
consumer pool renewal - it leverages considered-as-stable implementation of 
pool (Apache Commons Pool) instead of leveraging its own implementation.
https://github.com/apache/spark/pull/22138

We haven't track down the issue so can't say my patch would mitigate the issue, 
but I found suspicious cases which current implementation could leak (never 
release) the idle consumers so I might say worth to give it a try if you're 
having pain and still haven't resolve the issue.

This would give a nice rationalization on my patch to get it reviewed soon and 
merged if it succeeds to resolve this issue.

Some information would be needed.

1) Which Spark version you're having difficulty with? This is to port back the 
patch to the version. (For now the patch is against master branch) You may want 
to note that Spark 2.2.x looks EOL (2.3.x also in this year) in Apache 
community so may not get support for 2.2.x. 

2) Is it easily reproducible? I mean you may be hesitant to apply 
not-yet-merged patch to your production, so it would be even better if we can 
reproduce it easily in stage env., or even locally.

Thanks in advance!

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2019-02-19 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16771714#comment-16771714
 ] 

Jungtaek Lim commented on SPARK-21453:
--

Btw, just set higher priority since this bug affects multiple end users (here 
and KAFKA-5649).

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Major
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-12-14 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721386#comment-16721386
 ] 

Julian commented on SPARK-21453:


Also the issue is marked as Major on the other thread, but here only Minor?

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-12-14 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721380#comment-16721380
 ] 

Julian commented on SPARK-21453:


I see this issue is also raised against Kafka under 
https://issues.apache.org/jira/browse/KAFKA-5649

 

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-11-30 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705222#comment-16705222
 ] 

Julian commented on SPARK-21453:


I also get the following error around the same time (shortly after above one). 
Maybe it is related?

18/11/30 18:04:17 ERROR Executor: Exception in task 2.0 in stage 115.0 (TID 
2377)
java.lang.NullPointerException
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:436)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:395)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:137)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:307)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
    at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
    at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
    at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
    at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at 
org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
    at 
org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
    at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
>  

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-11-30 Thread Julian (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705215#comment-16705215
 ] 

Julian commented on SPARK-21453:


I see this issue is still open (although no one has posted here for a while), 
as I get exactly the same error periodically on our Spark 2.2 on our spark 
structured streaming job that is running on our HDP 2.6 cluster with Kafka 
(HDF). It is a pain, as it is causing tasks to fail and with a load currently 
of 1.5GB per 5 mins (to be upscaled 5+ times in the coming weeks/months) and it 
increases the overall time for a micro batch to complete. Any ideas would be 
good (I'll check link above also). Note, we are not using the experimental 
Continuous Mode in this version...

18/11/30 18:04:02 WARN SslTransportLayer: Failed to send SSL Close message
java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
    at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:212)
    at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:170)
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
    at org.apache.kafka.common.network.Selector.doClose(Selector.java:717)
    at org.apache.kafka.common.network.Selector.close(Selector.java:708)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:500)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1156)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
    at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
    at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
    at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
    at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    …….

 

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2018-04-02 Thread Daniel Nitzan (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423445#comment-16423445
 ] 

Daniel Nitzan commented on SPARK-21453:
---

This seems to be an issue with Continuous Processing, with long running tasks.

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-30 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147419#comment-16147419
 ] 

Pablo Panero commented on SPARK-21453:
--

[~zsxwing]
 Failed again but not even there is conext around it. Will try running in local 
for long. Maybe Im missing something
{code}
17/08/29 04:47:45 WARN DomainSocketFactory: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.
17/08/29 04:47:45 WARN DomainSocketFactory: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.
17/08/29 04:47:45 WARN DomainSocketFactory: The short-circuit local reads 
feature cannot be used because libhadoop cannot be loaded.
17/08/29 10:41:13 WARN SslTransportLayer: Failed to send SSL Close message 
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at 
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-09 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120393#comment-16120393
 ] 

Shixiong Zhu commented on SPARK-21453:
--

The error message looks like the Kafka broker storing the record for offset 
7591351 is offline/standby. If it can recover fast, you can just restart your 
query and it will start from the failed point.

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-09 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16119988#comment-16119988
 ] 

Pablo Panero commented on SPARK-21453:
--

[~zsxwing]  Concerning the cached consumer failure all I could get is:

{code}
17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the 
server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): 
Operation category READ is not supported in state standby. Visit 
https://s.apache.org/sbnn-error
17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the 
server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): 
Operation category READ is not supported in state standby. Visit 
https://s.apache.org/sbnn-error
17/08/09 03:55:49 WARN Client: Exception encountered while connecting to the 
server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): 
Operation category READ is not supported in state standby. Visit 
https://s.apache.org/sbnn-error
17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 
54855)
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 
54855)
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
17/08/09 07:36:19 ERROR Executor: Exception in task 0.0 in stage 18285.0 (TID 
54855)
org.apache.kafka.common.errors.NetworkException: The server disconnected before 
a response was received.
17/08/09 07:38:27 ERROR Executor: Exception in task 0.1 in stage 18285.0 (TID 
54858)
java.util.concurrent.TimeoutException: Cannot fetch record for offset 7591351 
in 12 milliseconds
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:219)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at 
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:47)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114071#comment-16114071
 ] 

Pablo Panero commented on SPARK-21453:
--

Yes! the job has been launched on debug mode. Ill send the logs as soon as it 
fails :D

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114036#comment-16114036
 ] 

Shixiong Zhu commented on SPARK-21453:
--

I meant the exception in the JIRA description which looks like a consumer 
issue. For the producer issue, please read 
https://issues.apache.org/jira/browse/SPARK-21453?focusedCommentId=16113218=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16113218

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114033#comment-16114033
 ] 

Pablo Panero commented on SPARK-21453:
--

[~zsxwing]  Yes, I definitely can. No worries, I just did not know that 
workaround. This is related to the producer, I guess you mean another issue on 
the consumer. Right? 

thanks a lot!

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114030#comment-16114030
 ] 

Shixiong Zhu commented on SPARK-21453:
--

Could you increase "spark.kafka.producer.cache.timeout" to "60m" or longer, so 
that we can focus on the cached Kafka consumer issue?

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114025#comment-16114025
 ] 

Pablo Panero commented on SPARK-21453:
--

[~zsxwing] Yes, I understand. However due to the verbosity right now its on 
ERROR level so the only thing I can get is:
{code}
17/08/03 14:18:06 ERROR Executor: Exception in task 4.0 in stage 35494.0 (TID 
354957)
java.lang.IllegalStateException: Cannot send after the producer is closed.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:174)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
at 
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:65)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/08/03 14:18:06 ERROR Executor: Exception in task 4.0 in stage 35494.0 (TID 
354957)
java.lang.IllegalStateException: Cannot send after the producer is closed.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:174)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
at 
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:65)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
17/08/03 14:18:06 ERROR Executor: Exception in task 4.0 in stage 35494.0 (TID 
354957)
java.lang.IllegalStateException: Cannot send after the producer is closed.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:174)
at 

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114020#comment-16114020
 ] 

Shixiong Zhu commented on SPARK-21453:
--

[~ppanero] Could you provide all logs? I need the logs before this exception to 
investigate the cause.

> Cached Kafka consumer may be closed too early
> -
>
> Key: SPARK-21453
> URL: https://issues.apache.org/jira/browse/SPARK-21453
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>Priority: Minor
>
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
>  

[jira] [Commented] (SPARK-21453) Cached Kafka consumer may be closed too early

2017-08-04 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114015#comment-16114015
 ] 

Pablo Panero commented on SPARK-21453:
--

[~zsxwing]

{code}
17/07/26 13:28:32 ERROR Executor: Exception in task 9.1 in stage 2.0 (TID 
276107)
java.lang.IllegalStateException: Cannot send after the producer is closed.
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:174)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:65)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$1.apply(KafkaWriter.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:91)
at 
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)


17/07/26 13:28:38 WARN SslTransportLayer: Failed to send SSL Close message 
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
at 
org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:106)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:157)
at 
org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:148)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at