Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
hm. you are getting below

AnalysisException: Append output mode not supported when there are
streaming aggregations on streaming DataFrames/DataSets without watermark;

The problem seems to be that you are using the append output mode when
writing the streaming query results to Kafka. This mode is designed for
scenarios where you want to append new data to an existing dataset at the
sink (in this case, the "sink" topic in Kafka). However, your query
involves a streaming aggregation: group by provinceId, window('createTime',
'1 hour', '30 minutes'). The problem is that Spark Structured Streaming
requires a watermark to ensure exactly-once processing when using
aggregations with append mode. Your code already defines a watermark on the
"createTime" column with a delay of 10 seconds (withWatermark("createTime",
"10 seconds")). However, the error message indicates it is missing on the
start column. Try adding watermark to "start" Column: Modify your code as
below  to include a watermark on the "start" column generated by the window
function:

from pyspark.sql.functions import col, from_json, explode, window, sum,
watermark

streaming_df = session.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "payment_msg") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(from_json(col("value").cast("string"),
schema).alias("parsed_value")) \
  .select("parsed_value.*") \
  .withWatermark("createTime", "10 seconds")  # Existing watermark on
createTime

*# Modified section with watermark on 'start' column*
streaming_df = streaming_df.groupBy(
  col("provinceId"),
  window(col("createTime"), "1 hour", "30 minutes")
).agg(
  sum(col("payAmount")).alias("totalPayAmount")
).withWatermark(expr("start"), "10 seconds")  # Watermark on
window-generated 'start'

# Rest of the code remains the same
streaming_df.createOrReplaceTempView("streaming_df")

spark.sql("""
SELECT
  window.start, window.end, provinceId, totalPayAmount
FROM streaming_df
ORDER BY window.start
""") \
.writeStream \
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()

Try and see how it goes

HTH

Mich Talebzadeh,

Technologist | Solutions Architect | Data Engineer  | Generative AI

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Solutions Architect | Data Engineer  | Generative AI
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Tue, 2 Apr 2024 at 22:43, Chloe He  wrote:

> Hi Mich,
>
> Thank you so much for your response. I really appreciate your help!
>
> You mentioned "defining the watermark using the withWatermark function on
> the streaming_df before creating the temporary view” - I believe this is
> what I’m doing and it’s not working for me. Here is the exact code snippet
> that I’m running:
>
> ```
> >>> streaming_df = session.readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "payment_msg")\
> .option("startingOffsets","earliest")\
> .load()\
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))\
> .select("parsed_value.*")\
> .withWatermark("createTime", "10 seconds")
>
> >>> streaming_df.createOrReplaceTempView("streaming_df”)
>
> >>> spark.sql("""
> SELECT
> window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> FROM streaming_df
> GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ORDER BY window.start
> """)\
>   .withWatermark("start", "10 seconds")\
>   .writeStream\
>   .format("kafka") \
>   .option("checkpointLocation", "checkpoint") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("topic", "sink") \
>   .start()
>
> AnalysisException: Append output mode not supported when there are
> streaming aggregations on streaming DataFrames/DataSets without watermark;
> EventTimeWatermark start#37: timestamp, 10 seconds
> ```
>
> I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks
> again!
>
> Best,
> Chloe
>
>
> On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> > ok let us 

RE: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hi Mich,

Thank you so much for your response. I really appreciate your help!

You mentioned "defining the watermark using the withWatermark function on the 
streaming_df before creating the temporary view” - I believe this is what I’m 
doing and it’s not working for me. Here is the exact code snippet that I’m 
running:

```
>>> streaming_df = session.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "payment_msg")\
.option("startingOffsets","earliest")\
.load()\
.select(from_json(col("value").cast("string"), 
schema).alias("parsed_value"))\
.select("parsed_value.*")\
.withWatermark("createTime", "10 seconds")

>>> streaming_df.createOrReplaceTempView("streaming_df”)

>>> spark.sql("""
SELECT
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM streaming_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
""")\
  .withWatermark("start", "10 seconds")\
  .writeStream\
  .format("kafka") \
  .option("checkpointLocation", "checkpoint") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("topic", "sink") \
  .start()

AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
EventTimeWatermark start#37: timestamp, 10 seconds
```

I’m using pyspark 3.5.1. Please let me know if I missed something. Thanks again!

Best,
Chloe


On 2024/04/02 20:32:11 Mich Talebzadeh wrote:
> ok let us take it for a test.
> 
> The original code of mine
> 
> def fetch_data(self):
> self.sc.setLogLevel("ERROR")
> schema = StructType() \
>  .add("rowkey", StringType()) \
>  .add("timestamp", TimestampType()) \
>  .add("temperature", IntegerType())
> checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
> try:
> 
> # construct a streaming dataframe 'streamingDataFrame' that
> subscribes to topic temperature
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", "temperature") \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "earliest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
> 
> 
> resultC = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.timestamp").alias("timestamp") \
>, col("parsed_value.temperature").alias("temperature"))
> 
> """
> We work out the window and the AVG(temperature) in the window's
> timeframe below
> This should return back the following Dataframe as struct
> 
>  root
>  |-- window: struct (nullable = false)
>  ||-- start: timestamp (nullable = true)
>  ||-- end: timestamp (nullable = true)
>  |-- avg(temperature): double (nullable = true)
> 
> """
> resultM = resultC. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature')
> 
> # We take the above DataFrame and flatten it to get the columns
> aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
> resultMF = resultM. \
>select( \
> F.col("window.start").alias("startOfWindow") \
>   , F.col("window.end").alias("endOfWindow") \
>   ,
> F.col("avg(temperature)").alias("AVGTemperature"))
> 
> # Kafka producer requires a key, value pair. We generate UUID
> key as the unique identifier of Kafka record
> uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())
> 
> """
> We take DataFrame resultMF containing temperature info and
> write it to Kafka. The uuid is serialized as a 

Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
ok let us take it for a test.

The original code of mine

def fetch_data(self):
self.sc.setLogLevel("ERROR")
schema = StructType() \
 .add("rowkey", StringType()) \
 .add("timestamp", TimestampType()) \
 .add("temperature", IntegerType())
checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
try:

# construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", "temperature") \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


resultC = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.timestamp").alias("timestamp") \
   , col("parsed_value.temperature").alias("temperature"))

"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct

 root
 |-- window: struct (nullable = false)
 ||-- start: timestamp (nullable = true)
 ||-- end: timestamp (nullable = true)
 |-- avg(temperature): double (nullable = true)

"""
resultM = resultC. \
 withWatermark("timestamp", "5 minutes"). \
 groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
 avg('temperature')

# We take the above DataFrame and flatten it to get the columns
aliased as "startOfWindowFrame", "endOfWindowFrame" and "AVGTemperature"
resultMF = resultM. \
   select( \
F.col("window.start").alias("startOfWindow") \
  , F.col("window.end").alias("endOfWindow") \
  ,
F.col("avg(temperature)").alias("AVGTemperature"))

# Kafka producer requires a key, value pair. We generate UUID
key as the unique identifier of Kafka record
uuidUdf= F.udf(lambda : str(uuid.uuid4()),StringType())

"""
We take DataFrame resultMF containing temperature info and
write it to Kafka. The uuid is serialized as a string and used as the key.
We take all the columns of the DataFrame and serialize them as
a JSON string, putting the results in the "value" of the record.
"""
result = resultMF.withColumn("uuid",uuidUdf()) \
 .selectExpr("CAST(uuid AS STRING) AS key",
"to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
 .writeStream \
 .outputMode('complete') \
 .format("kafka") \
 .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
 .option("topic", "avgtemperature") \
 .option('checkpointLocation', checkpoint_path) \
 .queryName("avgtemperature") \
 .start()

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

#print(result.status)
#print(result.recentProgress)
#print(result.lastProgress)

result.awaitTermination()

Now try to use sql for the entire transformation and aggression

#import this and anything else needed
from pyspark.sql.functions import from_json, col, window
from pyspark.sql.types import StructType, StringType,IntegerType,
FloatType, TimestampType


# Define the schema for the JSON data
schema = ... # Replace with your schema definition

# construct a streaming dataframe 'streamingDataFrame' that
subscribes to topic temperature
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \

[Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hello!

I am attempting to write a streaming pipeline that would consume data from a 
Kafka source, manipulate the data, and then write results to a downstream sink 
(Kafka, Redis, etc). I want to write fully formed SQL instead of using the 
function API that Spark offers. I read a few guides on how to do this and my 
understanding is that I need to create a temp view in order to execute my raw 
SQL queries via spark.sql(). 

However, I’m having trouble defining watermarks on my source. It doesn’t seem 
like there is a way to introduce watermark in the raw SQL that Spark supports, 
so I’m using the .withWatermark() function. However, this watermark does not 
work on the temp view.

Example code:
```
streaming_df.select(from_json(col("value").cast("string"), 
schema).alias("parsed_value")).select("parsed_value.*").withWatermark("createTime",
 "10 seconds”)

json_df.createOrReplaceTempView("json_df”)

session.sql("""
SELECT
window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
FROM json_df
GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
ORDER BY window.start
""")\
.writeStream\
.format("kafka") \
.option("checkpointLocation", "checkpoint") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sink") \
.start()
```
This throws
```
AnalysisException: Append output mode not supported when there are streaming 
aggregations on streaming DataFrames/DataSets without watermark;
```

If I switch out the SQL query and write it in the function API instead, 
everything seems to work fine.

How can I use .sql() in conjunction with watermarks?

Best,
Chloe