Prabhjot Singh Bharaj created SPARK-27409: ---------------------------------------------
Summary: Micro-batch support for Kafka Source in Spark 2.3 Key: SPARK-27409 URL: https://issues.apache.org/jira/browse/SPARK-27409 Project: Spark Issue Type: Question Components: Structured Streaming Affects Versions: 2.3.2 Reporter: Prabhjot Singh Bharaj It seems with this change - [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in micro-batch mode but only in continuous mode. Is that understanding correct ? {code:java} E Py4JJavaError: An error occurred while calling o217.load. E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549) E at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) E at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) E at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78) E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) E at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) E at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) E at java.lang.reflect.Method.invoke(Method.java:498) E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) E at py4j.Gateway.invoke(Gateway.java:282) E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) E at py4j.commands.CallCommand.execute(CallCommand.java:79) E at py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.lang.Thread.run(Thread.java:748) E Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) E at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) E at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) E at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) E at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657) E ... 19 more E Caused by: org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: non-existent (No such file or directory) E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) E at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) E ... 23 more E Caused by: java.io.FileNotFoundException: non-existent (No such file or directory) E at java.io.FileInputStream.open0(Native Method) E at java.io.FileInputStream.open(FileInputStream.java:195) E at java.io.FileInputStream.<init>(FileInputStream.java:138) E at java.io.FileInputStream.<init>(FileInputStream.java:93) E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) E at org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) E at org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) E at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) E ... 24 more{code} When running a simple data stream loader for kafka without an SSL cert, it goes through this code block - {code:java} ... ... org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) E at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) E at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) ... ...{code} Note that I haven't selected `trigger=continuous...` when creating the dataframe, still the code is going through the continuous path. My understanding was that `continuous` is optional and not the default. Please clarify. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org