OK sounds like your watermark is done outside of your processing.

Check this

            # construct a streaming dataframe streamingDataFrame that
subscribes to topic temperature
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            resultM = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))
            result = resultM. \
                     withWatermark("timestamp", "5 minutes"). \
                     groupBy(window(resultM.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature'). \
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 16 Feb 2022 at 06:37, karan alang <karan.al...@gmail.com> wrote:

>
> Hello All,
>
> I have a Structured Streaming pyspark program running on GCP Dataproc,
> which reads data from Kafka, and does some data massaging, and aggregation.
> I'm trying to use withWatermark(), and it is giving error.
>
> py4j.Py4JException: An exception was raised by the Python Proxy. Return
> Message: Traceback (most recent call last):
>
>   File
> "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line
> 2442, in _call_proxy
>
>     return_value = getattr(self.pool[obj_id], method)(*params)
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 196, in call
>
>     raise e
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
> 193, in call
>
>     self.func(DataFrame(jdf, self.sql_ctx), batch_id)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 444, in convertToDictForEachBatch
>
>     ap = Alarm(tdict, spark)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 356, in __init__
>
>     computeCount(l_alarm_df, l_alarm1_df)
>
>   File
> "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py",
> line 262, in computeCount
>
>     window(col("timestamp"), "10 minutes").alias("window")
>
> TypeError: 'module' object is not callable
>
> Details are in stackoverflow below :
>
> https://stackoverflow.com/questions/71137296/structuredstreaming-withwatermark-typeerror-module-object-is-not-callable
>
> Any ideas on how to debug/fix this ?
> tia !
>

Reply via email to