I quickly looked into the attached log in SO post, and the problem doesn't
seem to be related to Kafka. The error stack trace is from checkpointing to
GCS, and the implementation of OutputStream for GCS seems to be provided
with Google.

Could you please elaborate the stack trace or upload the log with redacting
secure texts?

On Thu, Jan 21, 2021 at 2:38 PM German Schiavon

> Hi,
> I couldn't reproduce this error :/ I wonder if there is something else
> underline causing it...
> *Input*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server
> localhost:9092 --topic test1
> {"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> >{"name": "pedro", "age": 50}
> *Output*
> ➜  kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server
> localhost:9092 --topic sink
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3}
> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4}
> val rawDF = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "test1")
>   .load
>   .selectExpr("CAST(value AS STRING)")
> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count"))
> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value")
> kafka_stream_output
>   .writeStream
>   .format("kafka")
>   .outputMode("update")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("topic", "sink")
>   .option("checkpointLocation", "/tmp/check")
>   .start()
> spark.streams.awaitAnyTermination()
On Wed, 20 Jan 2021 at 23:22, gshen
>> This SO post is pretty much the exact same issue:
>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic
>> The user mentions it's an issue with
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
