Hi,
I couldn't reproduce this error :/ I wonder if there is something else
underline causing it...
*Input*
➜ kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
localhost:9092 --topic test1
{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
>{"name": "pedro", "age": 50}
*Output*
➜ kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092 --topic sink
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
{"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
val rawDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.load
.selectExpr("CAST(value AS STRING)")
val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
kafka_stream_output
.writeStream
.format("kafka")
.outputMode("update")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.option("checkpointLocation", "/tmp/check")
.start()
spark.streams.awaitAnyTermination()
On Wed, 20 Jan 2021 at 23:22, gshen <[email protected]> wrote:
> This SO post is pretty much the exact same issue:
>
>
> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>
> The user mentions it's an issue with
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [email protected]
>
>