[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819809#comment-16819809 ] Gabor Somogyi commented on SPARK-27409: --- I mean does this cause any data processing issue other than the stack? > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819473#comment-16819473 ] Prabhjot Singh Bharaj commented on SPARK-27409: --- [~gsomogyi] I haven't tried it on master. I'm facing the problem with Spark 2.3.2 Here is a complete log - {code:java} ➜ ~/spark ((HEAD detached at v2.3.2)) ✗ ./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 Python 2.7.10 (default, Feb 22 2019, 21:17:52) [GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.37.14)] on darwin Type "help", "copyright", "credits" or "license" for more information. Ivy Default Cache set to: //.ivy2/cache The jars for the packages stored in: //.ivy2/jars :: loading settings :: url = jar:file://spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8;1.0 confs: [default] found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 in central found org.apache.kafka#kafka-clients;0.10.0.1 in local-m2-cache found net.jpountz.lz4#lz4;1.3.0 in local-m2-cache found org.xerial.snappy#snappy-java;1.1.2.6 in local-m2-cache found org.slf4j#slf4j-api;1.7.16 in spark-list found org.spark-project.spark#unused;1.0.0 in local-m2-cache :: resolution report :: resolve 1580ms :: artifacts dl 4ms :: modules in use: net.jpountz.lz4#lz4;1.3.0 from local-m2-cache in [default] org.apache.kafka#kafka-clients;0.10.0.1 from local-m2-cache in [default] org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 from central in [default] org.slf4j#slf4j-api;1.7.16 from spark-list in [default] org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default] org.xerial.snappy#snappy-java;1.1.2.6 from local-m2-cache in [default] - | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 6 | 2 | 2 | 0 || 6 | 0 | - :: problems summary :: ERRORS unknown resolver null unknown resolver null :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS :: retrieving :: org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8 confs: [default] 0 artifacts copied, 6 already retrieved (0kB/6ms) 19/04/16 16:31:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Python version 2.7.10 (default, Feb 22 2019 21:17:52) SparkSession available as 'spark'. >>> df = spark.readStream.format('kafka').option('kafka.bootstrap.servers', >>> 'localhost:9093').option("kafka.security.protocol", >>> "SSL",).option("kafka.ssl.keystore.password", >>> "").option("kafka.ssl.keystore.type", >>> "PKCS12").option("kafka.ssl.keystore.location", >>> 'non-existent').option('subscribe', 'no existing topic').load() Traceback (most recent call last): File "", line 1, in File "//spark/python/pyspark/sql/streaming.py", line 403, in load return self._df(self._jreader.load()) File "//spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "//spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "//spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o37.load. : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:702) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:557) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:540) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) at org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818963#comment-16818963 ] Gabor Somogyi commented on SPARK-27409: --- After I've modified the code the same way just like in scala case and checked it with 2.3.2 pyspark here are my thoughts. `trigger=continuous...` controls mainly the execution (MicroBatchExecution vs ContinuousExecution). See the code [here|https://github.com/apache/spark/blob/02b510728c31b70e6035ad541bfcdc2b59dcd79a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L243] This code is significantly re-written on master branch. Does this cause any issue? > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818882#comment-16818882 ] Gabor Somogyi commented on SPARK-27409: --- [~pbharaj] {quote}Yes, I'm following the kafka integration guide linked.{quote} {code:java} $ ./bin/pyspark Python 2.7.10 (default, Aug 17 2018, 17:41:52) [GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.0.42)] on darwin Type "help", "copyright", "credits" or "license" for more information. 19/04/16 12:24:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.2 /_/ Using Python version 2.7.10 (default, Aug 17 2018 17:41:52) SparkSession available as 'spark'. >>> df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', >>> 'localhost:9093').option("kafka.security.protocol", >>> "SSL",).option("kafka.ssl.keystore.password", >>> "").option("kafka.ssl.keystore.type", >>> "PKCS12").option("kafka.ssl.keystore.location", >>> 'non-existent').option('subscribe', 'no existing topic').load() Traceback (most recent call last): File "", line 1, in AttributeError: 'SparkContext' object has no attribute 'sql' >>> {code} > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818070#comment-16818070 ] Prabhjot Singh Bharaj commented on SPARK-27409: --- > "kafka.ssl.keystore.location", "non-existent" This was set to test the flow. > Why do you write "df = sc.sql.readStream..." (its just not working) I'm doing this is pyspark, where I get the spark context. > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818056#comment-16818056 ] Gabor Somogyi commented on SPARK-27409: --- Are you really sure you've followed the following integration guide? https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818051#comment-16818051 ] Gabor Somogyi commented on SPARK-27409: --- Don't know a couple of things in you code: * Why do you set "kafka.ssl.keystore.location", "non-existent"? * Why do you write "df = sc.sql.readStream..." (its just not working) {code:java} scala> sc.sql.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() :25: error: value sql is not a member of org.apache.spark.SparkContext sc.sql.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() ^ {code} Seems like its working when I've executed the following: {code:java} scala> spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("kafka.security.protocol", "SSL").option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", "non-existent").option("subscribe", "no existing topic").load() res0: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] scala> res0.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("checkpointLocation", "/tmp").start() res2: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@389f4282 scala> 2019-04-15 17:04:03 WARN KafkaOffsetReader:87 - Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:85) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:199) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:288) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:287) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:286) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:197) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:255) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:196) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:195) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader$$anonfun$getOrCreateInitialPartitionOffsets$1.apply(KafkaMicroBatchReader.scala:190) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.getOrCreateInitialPartitionOffsets(KafkaMicroBatchReader.scala:190) {code} [~pbharaj] Have you tested it on master branch? > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815491#comment-16815491 ] Prabhjot Singh Bharaj commented on SPARK-27409: --- {code:java} In [5]: df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",)\ ...: .option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load() 2019-04-11 15:04:49.903 INFO ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:9093] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = spark-kafka-source-a944d4a9-0257-492a-88b4-bba9decebb28-876181411-driver-0 heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 1 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = non-existent ssl.keystore.password = [hidden] ssl.keystore.type = PKCS12 ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer --- Py4JJavaError Traceback (most recent call last) // in () > 1 df = sc.sql.readStream.format('kafka').option('kafka.bootstrap.servers', 'localhost:9093').option("kafka.security.protocol", "SSL",).option("kafka.ssl.keystore.password", "").option("kafka.ssl.keystore.type", "PKCS12").option("kafka.ssl.keystore.location", 'non-existent').option('subscribe', 'no existing topic').load() //python/pyspark/sql/streaming.pyc in load(self, path, format, schema, **options) 401 return self._df(self._jreader.load(path)) 402 else: --> 403 return self._df(self._jreader.load()) 404 405 @since(2.0) //python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args) 1255 answer = self.gateway_client.send_command(command) 1256 return_value = get_return_value( -> 1257 answer, self.gateway_client, self.target_id, self.name) 1258 1259 for temp_arg in temp_args: //python/pyspark/sql/utils.pyc in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() //python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". --> 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( Py4JJavaError: An error occurred while calling o147.load. : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) at org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814314#comment-16814314 ] Gabor Somogyi commented on SPARK-27409: --- ping [~pbharaj] > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note that I haven't selected `trigger=continuous...` when
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813094#comment-16813094 ] Gabor Somogyi commented on SPARK-27409: --- It would definitely help if you can provide your application code. > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note that I
[jira] [Commented] (SPARK-27409) Micro-batch support for Kafka Source in Spark 2.3
[ https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812991#comment-16812991 ] Shivu Sondur commented on SPARK-27409: -- i am checking this > Micro-batch support for Kafka Source in Spark 2.3 > - > > Key: SPARK-27409 > URL: https://issues.apache.org/jira/browse/SPARK-27409 > Project: Spark > Issue Type: Question > Components: Structured Streaming >Affects Versions: 2.3.2 >Reporter: Prabhjot Singh Bharaj >Priority: Major > > It seems with this change - > [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50] > in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in > micro-batch mode but only in continuous mode. Is that understanding correct ? > {code:java} > E Py4JJavaError: An error occurred while calling o217.load. > E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:566) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:549) > E at > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314) > E at > org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > E at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > E at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > E at java.lang.reflect.Method.invoke(Method.java:498) > E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > E at py4j.Gateway.invoke(Gateway.java:282) > E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > E at py4j.commands.CallCommand.execute(CallCommand.java:79) > E at py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.lang.Thread.run(Thread.java:748) > E Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: > non-existent (No such file or directory) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44) > E at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93) > E at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51) > E at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84) > E at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657) > E ... 19 more > E Caused by: org.apache.kafka.common.KafkaException: > java.io.FileNotFoundException: non-existent (No such file or directory) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121) > E at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41) > E ... 23 more > E Caused by: java.io.FileNotFoundException: non-existent (No such file or > directory) > E at java.io.FileInputStream.open0(Native Method) > E at java.io.FileInputStream.open(FileInputStream.java:195) > E at java.io.FileInputStream.(FileInputStream.java:138) > E at java.io.FileInputStream.(FileInputStream.java:93) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216) > E at > org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201) > E at > org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137) > E at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119) > E ... 24 more{code} > When running a simple data stream loader for kafka without an SSL cert, it > goes through this code block - > > {code:java} > ... > ... > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130) > E at > org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43) > E at > org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185) > ... > ...{code} > > Note that I haven't selected `trigger=continuous...` when