RE: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
). \
>  avg('temperature')
> 
> # We take the above DataFrame and flatten it to get the columns
> aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
> resultMF = resultM. \
>select( \
> F.col("window.start").alias("startOfWindow") \
>   , F.col("window.end").alias("endOfWindow") \
>   ,
> F.col("avg(temperature)").alias("AVGTemperature"))
> 
> # Kafka producer requires a key, value pair. We generate UUID
> key as the unique identifier of Kafka record
> uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
> 
> """
> We take DataFrame resultMF containing temperature info and
> write it to Kafka. The uuid is serialized as a string and used as the key.
> We take all the columns of the DataFrame and serialize them as
> a JSON string, putting the results in the "value" of the record.
> """
> result = resultMF.withColumn("uuid",uuidUdf()) \
>  .selectExpr("CAST(uuid AS STRING) AS key",
> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
>  .writeStream \
>  .outputMode('complete') \
>  .format("kafka") \
>  .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>  .option("topic", "avgtemperature") \
>  .option('checkpointLocation', checkpoint_path) \
>  .queryName("avgtemperature") \
>  .start()
> 
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
> 
> #print(result.status)
> #print(result.recentProgress)
> #print(result.lastProgress)
> 
> result.awaitTermination()
> 
> Now try to use sql for the entire transformation and aggression
> 
> #import this and anything else needed
> from pyspark.sql.functions import from_json, col, window
> from pyspark.sql.types import StructType, StringType,IntegerType,
> FloatType, TimestampType
> 
> 
> # Define the schema for the JSON data
> schema = ... # Replace with your schema definition
> 
> # construct a streaming dataframe 'streamingDataFrame' that
> subscribes to topic temperature
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "earliest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>   .select("parsed_value.*")
>   .withWatermark("createTime", "10 seconds"))  # Define the
> watermark here
> 
> # Create a temporary view from the streaming DataFrame with watermark
> streaming_df.createOrReplaceTempView("michboy")
> 
> # Execute SQL queries on the temporary view
> result_df = (spark.sql("""
> SELECT
> window.start, window.end, provinceId, sum(payAmount) as
> totalPayAmount
> FROM michboy
> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ORDER BY window.start
> """)
>   .writeStream
>   .format(&qu

[Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hello!

I am attempting to write a streaming pipeline that would consume data from a 
Kafka source, manipulate the data, and then write results to a downstream sink 
(Kafka, Redis, etc). I want to write fully formed SQL instead of using the 
function API that Spark offers. I read a few guides on how to do this and my 
understanding is that I need to create a temp view in order to execute my raw 
SQL queries via spark.sql(). 

However, I’m having trouble defining watermarks on my source. It doesn’t seem 
like there is a way to introduce watermark in the raw SQL that Spark supports, 
so I’m using the .withWatermark() function. However, this watermark does not 
work on the temp view.

Example code:
```
streaming_df.select(from_json(col("value").cast("string"), 
schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
 "10 seconds”)

json_df.createOrReplaceTempView("json_df”)

session.sql("""
SELECT
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM json_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
""")\
.writeStream\
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()
```
This throws
```
AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
```

If I switch out the SQL query and write it in the function API instead, 
everything seems to work fine.

How can I use .sql() in conjunction with watermarks?

Best,
Chloe