Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-13 Thread karan alang
Hi Mich,

The code I sent for the function 'convertToDictForEachBatch' is not the
complete code.
It does use the DF to do a bunch of transformations/operations.

Specific to the problem I sent the email for :
One piece of the code reloads the prediction data from Bigquery based on
the 'event' in topic, the event indicates that the prediction data in
Bigquery is changed.
Here is the code with comments, hope that clarifies.

```

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):

   # Uses the dataframe to do processing of data, that code is Not
added, since it is not relevant to this question

   # Additional processing i.e. reloading of prediction data from Big
query, into DataFrame - based on event in Kafka topic
   # checks for event in topic - topic_reloadpred and further
processing takes place if there is new data in the topic

# requirement : read data from topic - topic_reloadpred, then check if
there are additional rows added, if yes - call method to reload data
from BigQuery

events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

# if event dataframe had new records(from last read time), call method
to reload data from BigQuery

reloadDataFromBigQuery()

```
The requirement is to identify that new rows have been added to the topic -
topic_reloadpred, and then reload data from BigQuery to dataframe, if
required.
(pls note - the data loaded from BigQuery is persisted ie df.persist(), and
changes in-frequently)

One idea is to store the maxOffset read from each batch read from
topic_reloadpred, and when the next batch is read - compare that with
'stored' maxOffset,
to determine if new records have been added to the topic.

What is the best way to fulfill this requirement ?

regds,
Karan Alang







On Sat, Mar 12, 2022 at 12:42 PM Mich Talebzadeh 
wrote:

> There are a number of flaws here.
>
> You have defined your trigger based processing time within Spark
> Structured Streaming (SSS) as below
>
> trigger(processingTime='4 minutes')
>
>
> SSS will trigger every 4 minutes, in other words within a micro-batch of 4
> minutes. This is what is known as micro-batch interval. The way this works
> is that SSS is actioned every 4 minutes. If the previous batch finished in
> 1 minute, then SSS will wait for (4-1 = 3) minutes before processing again.
> If the previous processing took 5 minutes to finish, then we have a
> potential backlog and SSS will process immediately after the previous job
> finishes (in other words it kicks off the next micro-batch).
>
>
> Now the function foreachBatch(convertToDictForEachBatch) performs custom
> write logic on each micro-batch through convertToDictForEachBatch
> function. foreachBatch(convertToDictForEachBatch) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch. However, in calling the function convertToDictForEachBatch as
> below
>
> def convertToDictForEachBatch(df, batchId):
>
> # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
> events = spark.read.format('kafka') \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
> .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
> .option("subscribe", topic_reloadpred) \
> .option("kafka.group.id", consumerGroupId_reloadpred) \
> .load()
>
> There is no use case for df -> DataFrame in the code? So what are you 
> checking here? What happens if df is empty?
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh 
> wrote:
>
>> How do you check if new data is in the topic and what happens if 

Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
There are a number of flaws here.

You have defined your trigger based processing time within Spark
Structured Streaming (SSS) as below

trigger(processingTime='4 minutes')


SSS will trigger every 4 minutes, in other words within a micro-batch of 4
minutes. This is what is known as micro-batch interval. The way this works
is that SSS is actioned every 4 minutes. If the previous batch finished in
1 minute, then SSS will wait for (4-1 = 3) minutes before processing again.
If the previous processing took 5 minutes to finish, then we have a
potential backlog and SSS will process immediately after the previous job
finishes (in other words it kicks off the next micro-batch).


Now the function foreachBatch(convertToDictForEachBatch) performs custom
write logic on each micro-batch through convertToDictForEachBatch function.
foreachBatch(convertToDictForEachBatch) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch.
However, in calling the function convertToDictForEachBatch as below

def convertToDictForEachBatch(df, batchId):

# checks for event in topic - events_topic and further processing
takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location",
ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password",
ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()

There is no use case for df -> DataFrame in the code? So what are you
checking here? What happens if df is empty?

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 12 Mar 2022 at 09:33, Mich Talebzadeh 
wrote:

> How do you check if new data is in the topic and what happens if not?
>
> On Sat, 12 Mar 2022 at 00:40, karan alang  wrote:
>
>> Hello All,
>>
>> I have a structured Streaming program, which reads data from Kafka topic,
>> and does some processing, and finally puts data into target Kafka Topic.
>>
>> Note : the processing is donee topic in function -
>> convertToDictForEachBatch(), which is called using -
>> foreachBatch(convertToDictForEachBatcha is in th)
>>
>> As part of the processing, it reads another Kafka Topic (events_topic),
>> and if there is New record(s) after the last read, it does some additional
>> processing - reloads data from BigQuery table, and persists it.
>>
>> Here is the code :
>>
>> ```
>>
>> df_stream = spark.readStream.format('kafka') \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location", ssl_keystore_location) \
>> .option("kafka.ssl.keystore.password", ssl_keystore_password) \
>> .option("kafka.bootstrap.servers",kafkaBrokers)\
>> .option("subscribe", topic) \
>> .option("kafka.group.id", consumerGroupId)\
>> .option("startingOffsets", "latest") \
>> .option("failOnDataLoss", "false") \
>> .option("maxOffsetsPerTrigger", 1) \
>> .load()
>>
>>
>> print(" df_stream -> ", df_stream)
>> query = df_stream.selectExpr("CAST(value AS STRING)", 
>> "timestamp").writeStream \
>> .outputMode("append") \
>> .trigger(processingTime='4 minutes') \
>> .option("numRows",1)\
>> .option("truncate", "false") \
>> .option("checkpointLocation", checkpoint) \
>> .foreachBatch(convertToDictForEachBatch) \
>> .start()
>>
>> query.awaitTermination()
>>
>> ```
>>
>> # called from - foreachbatch
>> def convertToDictForEachBatch(df, batchId):
>>
>> # checks for event in topic - events_topic and further processing takes 
>> place if there is new data in the topic
>> events = spark.read.format('kafka') \
>> .option("kafka.bootstrap.servers", kafkaBrokers) \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location", 
>> 

Re: StructuredStreaming - processing data based on new events in Kafka topic

2022-03-12 Thread Mich Talebzadeh
How do you check if new data is in the topic and what happens if not?

On Sat, 12 Mar 2022 at 00:40, karan alang  wrote:

> Hello All,
>
> I have a structured Streaming program, which reads data from Kafka topic,
> and does some processing, and finally puts data into target Kafka Topic.
>
> Note : the processing is donee topic in function -
> convertToDictForEachBatch(), which is called using -
> foreachBatch(convertToDictForEachBatcha is in th)
>
> As part of the processing, it reads another Kafka Topic (events_topic),
> and if there is New record(s) after the last read, it does some additional
> processing - reloads data from BigQuery table, and persists it.
>
> Here is the code :
>
> ```
>
> df_stream = spark.readStream.format('kafka') \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", ssl_keystore_location) \
> .option("kafka.ssl.keystore.password", ssl_keystore_password) \
> .option("kafka.bootstrap.servers",kafkaBrokers)\
> .option("subscribe", topic) \
> .option("kafka.group.id", consumerGroupId)\
> .option("startingOffsets", "latest") \
> .option("failOnDataLoss", "false") \
> .option("maxOffsetsPerTrigger", 1) \
> .load()
>
>
> print(" df_stream -> ", df_stream)
> query = df_stream.selectExpr("CAST(value AS STRING)", 
> "timestamp").writeStream \
> .outputMode("append") \
> .trigger(processingTime='4 minutes') \
> .option("numRows",1)\
> .option("truncate", "false") \
> .option("checkpointLocation", checkpoint) \
> .foreachBatch(convertToDictForEachBatch) \
> .start()
>
> query.awaitTermination()
>
> ```
>
> # called from - foreachbatch
> def convertToDictForEachBatch(df, batchId):
>
> # checks for event in topic - events_topic and further processing takes 
> place if there is new data in the topic
> events = spark.read.format('kafka') \
> .option("kafka.bootstrap.servers", kafkaBrokers) \
> .option("kafka.security.protocol", "SSL") \
> .option("kafka.ssl.truststore.location", ssl_truststore_location) \
> .option("kafka.ssl.truststore.password", ssl_truststore_password) \
> .option("kafka.ssl.keystore.location", 
> ssl_keystore_location_reloadpred) \
> .option("kafka.ssl.keystore.password", 
> ssl_keystore_password_reloadpred) \
> .option("subscribe", topic_reloadpred) \
> .option("kafka.group.id", consumerGroupId_reloadpred) \
> .load()
>
> # events is passed to a function, and processing is done if new events 
> are generated
>
> ```
>
> What is the best way to achieve this ? The current code is reading the
> entire data in the kafka topic, i need it to read only the new data.
>
> Additional Details in stackoverflow :
>
>
> https://stackoverflow.com/questions/71446023/structuredstreaming-processing-data-based-on-new-events-in-kafka-topic
>
>
> tia!
>
-- 



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.