I can actually compile the following code with any one of these jars. But
none of them seem to print the messages to console however when I use
Kafka-console-consumer with the same hello topic I can see messages. When I
run my spark code it just hangs here forever even when I continue producing
messages.
17/09/11 21:59:12 INFO AppInfoParser: Kafka version : 0.10.0.1
17/09/11 21:59:12 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
17/09/11 21:59:12 INFO CodeGenerator: Code generated in 153.741942 ms
17/09/11 21:59:12 INFO CodeGenerator: Code generated in 9.219524 ms
17/09/11 21:59:12 INFO AbstractCoordinator: Discovered coordinator
localhost:9092 (id: 2147483647 rack: null) for group
spark-kafka-source-ea00925f-353e-49e7-8719-f812bde4c25a--849182030-executor.
compile group: 'org.apache.spark', name:
'spark-streaming-kafka-0-10_2.11', version: 2.2.0
compile group: 'org.apache.spark', name: 'spark-sql-kafka-0-10_2.11',
version: 2.2.0
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'
StreamingQuery query = sparkSession.readStream() .format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe",
"hello")) .option("startingOffsets", "earliest") .load() .writeStream()
.format("console") .start(); query.awaitTermination();
On Mon, Sep 11, 2017 at 6:24 PM, kant kodali <[email protected]> wrote:
> Hi All,
>
> Does Kafka dependency jars changed for Spark Structured Streaming 2.2.0?
>
> kafka-clients-0.10.0.1.jar
>
> spark-streaming-kafka-0-10_2.11-2.2.0.jar
>
>
> 1) Above two are the only Kafka related jars or am I missing something?
>
> 2) What is the difference between the above two jars?
>
> 3) If I have the following code. Which jar do I use? It looks like I was
> able to compile if any one of the jars above thats why its a bit confusing.
>
>
> StreamingQuery query = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "hello"))
> .option("startingOffsets", "earliest")
> .load()
> .writeStream()
> .format("console")
> .start();
>
> query.awaitTermination();
>
>
> Thanks
>
>
>
>
>
>
>