Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-09 Thread Mich Talebzadeh
Hi Ashok,

Thanks for pointing out the databricks article Scalable Spark Structured
Streaming for REST API Destinations | Databricks Blog


I browsed it and it is basically similar to many of us involved with spark
structure streaming with *foreachBatch. *This article and mine both mention
REST API as part of the architecture. However, there are notable
differences I believe.

In my proposed approach:

   1. Event-Driven Model:


   - Spark Streaming waits until Flask REST API makes a request for events
   to be generated within PySpark.
   - Messages are generated and then fed into any sink based on the Flask
   REST API's request.
   - This creates a more event-driven model where Spark generates data when
   prompted by external requests.





In the Databricks article scenario:

Continuous Data Stream:

   - There is an incoming stream of data from sources like Kafka, AWS
   Kinesis, or Azure Event Hub handled by foreachBatch
   - As messages flow off this stream, calls are made to a REST API with
   some or all of the message data.
   - This suggests a continuous flow of data where messages are sent to a
   REST API as soon as they are available in the streaming source.


*Benefits of Event-Driven Model:*


   1. Responsiveness: Ideal for scenarios where data generation needs to be
   aligned with specific events or user actions.
   2. Resource Optimization: Can reduce resource consumption by processing
   data only when needed.
   3. Flexibility: Allows for dynamic control over data generation based on
   external triggers.

*Benefits of Continuous Data Stream Mode with foreachBatch:*

   1. Real-Time Processing: Facilitates immediate analysis and action on
   incoming data.
   2. Handling High Volumes: Well-suited for scenarios with
   continuous, high-volume data streams.
   3. Low-Latency Applications: Essential for applications requiring near
   real-time responses.

*Potential Use Cases for my approach:*

   - On-Demand Data Generation: Generating data for
   simulations, reports, or visualizations based on user requests.
   - Triggered Analytics: Executing specific analytics tasks only when
   certain events occur, such as detecting anomalies or reaching thresholds
   say fraud detection.
   - Custom ETL Processes: Facilitating data
   extraction, transformation, and loading workflows based on external events
   or triggers


Something to note on latency. Event-driven models like mine can potentially
introduce slight latency compared to continuous processing, as data
generation depends on API calls.

So my approach is more event-triggered and responsive to external requests,
while foreachBatch scenario is more continuous and real-time, processing
and sending data as it becomes available.

In summary, both approaches have their merits and are suited to different
use cases depending on the nature of the data flow and processing
requirements.

Cheers

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 9 Jan 2024 at 19:11, ashok34...@yahoo.com 
wrote:

> Hey Mich,
>
> Thanks for this introduction on your forthcoming proposal "Spark
> Structured Streaming and Flask REST API for Real-Time Data Ingestion and
> Analytics". I recently came across an article by Databricks with title 
> Scalable
> Spark Structured Streaming for REST API Destinations
> 
> . Their use case is similar to your suggestion but what they are saying
> is that they have incoming stream of data from sources like Kafka, AWS
> Kinesis, or Azure Event Hub. In other words, a continuous flow of data
> where messages are sent to a REST API as soon as they are available in the
> streaming source. Their approach is practical but wanted to get your
> thoughts on their article with a better understanding on your proposal and
> differences.
>
> Thanks
>
>
> On Tuesday, 9 January 2024 at 00:24:19 GMT, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>
> Please also note that Flask, by default, is a single-threaded web
> framework. While it is suitable for development and small-scale
> applications, it may not handle concurrent requests efficiently in a
> production environment.
> In production, one can utilise Gunicorn (Green Unicorn) which is a WSGI (
> Web Server Gateway Interface) that is commonly used to serve Flask
> applications in productio

Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-09 Thread ashok34...@yahoo.com.INVALID
 Hey Mich,
Thanks for this introduction on your forthcoming proposal "Spark Structured 
Streaming and Flask REST API for Real-Time Data Ingestion and Analytics". I 
recently came across an article by Databricks with title Scalable Spark 
Structured Streaming for REST API Destinations. Their use case is similar to 
your suggestion but what they are saying is that they have incoming stream of 
data from sources like Kafka, AWS Kinesis, or Azure Event Hub. In other words, 
a continuous flow of data where messages are sent to a REST API as soon as they 
are available in the streaming source. Their approach is practical but wanted 
to get your thoughts on their article with a better understanding on your 
proposal and differences.
Thanks

On Tuesday, 9 January 2024 at 00:24:19 GMT, Mich Talebzadeh 
 wrote:  
 
 Please also note that Flask, by default, is a single-threaded web framework. 
While it is suitable for development and small-scale applications, it may not 
handle concurrent requests efficiently in a production environment.In 
production, one can utilise Gunicorn (Green Unicorn) which is a WSGI ( Web 
Server Gateway Interface) that is commonly used to serve Flask applications in 
production. It provides multiple worker processes, each capable of handling a 
single request at a time. This makes Gunicorn suitable for handling multiple 
simultaneous requests and improves the concurrency and performance of your 
Flask application.

HTH
Mich Talebzadeh,Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 


On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh  wrote:

Thought it might be useful to share my idea with fellow forum members.  During 
the breaks, I worked on the seamless integration of Spark Structured Streaming 
with Flask REST API for real-time data ingestion and analytics. The use case 
revolves around a scenario where data is generated through REST API requests in 
real time. The Flask REST API efficiently captures and processes this data, 
saving it to a Spark Structured Streaming DataFrame. Subsequently, the 
processed data could be channelled into any sink of your choice including Kafka 
pipeline, showing a robust end-to-end solution for dynamic and responsive data 
streaming. I will delve into the architecture, implementation, and benefits of 
this combination, enabling one to build an agile and efficient real-time data 
application. I will put the code in GitHub for everyone's benefit. Hopefully 
your comments will help me to improve it.
Cheers
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom



   view my Linkedin profile




 https://en.everybodywiki.com/Mich_Talebzadeh

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destructionof data or any other property which may arise from relying 
on this email's technical content is explicitly disclaimed.The author will in 
no case be liable for any monetary damages arising from suchloss, damage or 
destruction. 

 

  

Re: Spark Structured Streaming and Flask REST API for Real-Time Data Ingestion and Analytics.

2024-01-08 Thread Mich Talebzadeh
Please also note that Flask, by default, is a single-threaded web
framework. While it is suitable for development and small-scale
applications, it may not handle concurrent requests efficiently in a
production environment.
In production, one can utilise Gunicorn (Green Unicorn) which is a WSGI (
Web Server Gateway Interface) that is commonly used to serve Flask
applications in production. It provides multiple worker processes, each
capable of handling a single request at a time. This makes Gunicorn
suitable for handling multiple simultaneous requests and improves the
concurrency and performance of your Flask application.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 8 Jan 2024 at 19:30, Mich Talebzadeh 
wrote:

> Thought it might be useful to share my idea with fellow forum members.  During
> the breaks, I worked on the *seamless integration of Spark Structured
> Streaming with Flask REST API for real-time data ingestion and analytics*.
> The use case revolves around a scenario where data is generated through
> REST API requests in real time. The Flask REST AP
> I efficiently
> captures and processes this data, saving it to a Spark Structured Streaming
> DataFrame. Subsequently, the processed data could be channelled into any
> sink of your choice including Kafka pipeline, showing a robust end-to-end
> solution for dynamic and responsive data streaming. I will delve into the
> architecture, implementation, and benefits of this combination, enabling
> one to build an agile and efficient real-time data application. I will put
> the code in GitHub for everyone's benefit. Hopefully your comments will
> help me to improve it.
>
> Cheers
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark structured streaming tab is missing from spark web UI

2023-11-24 Thread Jungtaek Lim
The feature was added in Spark 3.0. Btw, you may want to check out the EOL
date for Apache Spark releases - https://endoflife.date/apache-spark 2.x is
already EOLed.


On Fri, Nov 24, 2023 at 11:13 PM mallesh j 
wrote:

> Hi Team,
>
> I am trying to test the performance of a spark streaming application that
> I wrote which reads/writes data to Kafka. Code is working fine but I cannot
> see the Streaming tab in the UI. I tried enabling it by adding below config
> to spark conf but still no luck. I have even checked in Google/Stack
> overflow on this but did not get it. So can you please check and let me
> know on the same ? If it is present or not , how can I enable it?
>
> Attached is the screenshot for the same
>
> Spark version 2.4
> Scala version 2.11
>
>
> Thanks & Regards
>  Mallesh Jogu,
> + 919493390341.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: [Spark Structured Streaming]: Dynamic Scaling of Executors

2023-05-29 Thread Aishwarya Panicker
Hi,


Thanks for your response.


I understand there is no explicit way to configure dynamic scaling for
Spark Structured Streaming as the ticket is still open for that. But is
there a way to manage dynamic scaling with the existing Batch Dynamic
scaling algorithm as this kicks in when Dynamic allocation is enabled with
Structured Streaming. The issue I’m facing with batch dynamic allocation is
that it requests executors based on pending/running tasks. And to have
parallelism we have set spark.sql.shuffle.partitions: "100"  due to which
100 partitions are getting created and thus 100 tasks which is causing more
executors to be requested(not scaling based on load). Is there mechanism to
control this autoscaling behaviour of executors based on data load?


Additionally, Spark Streaming dynamic allocation algorithm autoscales
executors based on the processing time/ batch interval ratio which would be
a preferred method for streaming use case. So is there a provision to use
the streaming configurations instead of the batch mode configurations with
structured streaming?


Any suggestions on the above would be helpful.


Thanks and Regards,

Aishwarya


On Thu, 25 May, 2023, 11:46 PM Mich Talebzadeh, 
wrote:

> Hi,
> Autoscaling is not compatible with Spark Structured Streaming
> 
>  since
> Spark Structured Streaming currently does not support dynamic allocation
> (see SPARK-24815: Structured Streaming should support dynamic allocation
> ).
>
> That ticket is still open
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 25 May 2023 at 18:44, Aishwarya Panicker <
> aishwaryapanicke...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I have been working on Spark Structured Streaming and trying to autoscale
>> our application through dynamic allocation. But I couldn't find any
>> documentation or configurations that supports dynamic scaling in Spark
>> Structured Streaming, due to which I had been using Spark Batch mode
>> dynamic scaling which is not so efficient with streaming use case.
>>
>> I also tried with Spark streaming dynamic allocation configurations which
>> didn't work with structured streaming.
>>
>> Below are the configurations I tried for dynamic scaling of my Spark
>> Structured Streaming Application:
>>
>> With Batch Spark configurations:
>>
>> spark.dynamicAllocation.enabled: true
>> spark.dynamicAllocation.executorAllocationRatio: 0.5
>> spark.dynamicAllocation.minExecutors: 1
>> spark.dynamicAllocation.maxExecutors: 5
>>
>>
>> With Streaming Spark configurations:
>>
>> spark.dynamicAllocation.enabled: false
>> spark.streaming.dynamicAllocation.enabled: true
>> spark.streaming.dynamicAllocation.scaleUpRatio: 0.7
>> spark.streaming.dynamicAllocation.scaleDownRatio: 0.2
>> spark.streaming.dynamicAllocation.minExecutors: 1
>> spark.streaming.dynamicAllocation.maxExecutors: 5
>>
>> Kindly let me know if there is any configuration for the dynamic
>> allocation of Spark Structured Streaming which I'm missing due to which
>> autoscaling of my application is not working properly.
>>
>> Awaiting your response.
>>
>> Thanks and Regards,
>> Aishwarya
>>
>>
>>
>>
>>


Re: [Spark Structured Streaming]: Dynamic Scaling of Executors

2023-05-25 Thread Mich Talebzadeh
Hi,
Autoscaling is not compatible with Spark Structured Streaming

since
Spark Structured Streaming currently does not support dynamic allocation
(see SPARK-24815: Structured Streaming should support dynamic allocation
).

That ticket is still open

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 25 May 2023 at 18:44, Aishwarya Panicker <
aishwaryapanicke...@gmail.com> wrote:

> Hi Team,
>
> I have been working on Spark Structured Streaming and trying to autoscale
> our application through dynamic allocation. But I couldn't find any
> documentation or configurations that supports dynamic scaling in Spark
> Structured Streaming, due to which I had been using Spark Batch mode
> dynamic scaling which is not so efficient with streaming use case.
>
> I also tried with Spark streaming dynamic allocation configurations which
> didn't work with structured streaming.
>
> Below are the configurations I tried for dynamic scaling of my Spark
> Structured Streaming Application:
>
> With Batch Spark configurations:
>
> spark.dynamicAllocation.enabled: true
> spark.dynamicAllocation.executorAllocationRatio: 0.5
> spark.dynamicAllocation.minExecutors: 1
> spark.dynamicAllocation.maxExecutors: 5
>
>
> With Streaming Spark configurations:
>
> spark.dynamicAllocation.enabled: false
> spark.streaming.dynamicAllocation.enabled: true
> spark.streaming.dynamicAllocation.scaleUpRatio: 0.7
> spark.streaming.dynamicAllocation.scaleDownRatio: 0.2
> spark.streaming.dynamicAllocation.minExecutors: 1
> spark.streaming.dynamicAllocation.maxExecutors: 5
>
> Kindly let me know if there is any configuration for the dynamic
> allocation of Spark Structured Streaming which I'm missing due to which
> autoscaling of my application is not working properly.
>
> Awaiting your response.
>
> Thanks and Regards,
> Aishwarya
>
>
>
>
>


Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-03-09 Thread hueiyuan su
Dear Mich,

Sure, that is a good idea. If we have a pause() function, we can
temporarily stop streaming and adjust configuration, maybe from environment
variable.
Once these parameters are adjust, we can restart the streaming to apply the
newest parameter without stop spark streaming application.

Mich Talebzadeh  於 2023年3月10日 週五 上午12:26寫道:

> most probably we will require an  additional method pause()
>
>
> https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html
>
> to allow us to pause (as opposed to stop()) the streaming process and
> resume after changing the parameters. The state of streaming needs to be
> preserved.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh 
> wrote:
>
>> hm interesting proposition. I guess you mean altering one of following
>> parameters in flight
>>
>>
>>   streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", config['MDVariables']['topic']) \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>> Ok, one secure way of doing it though shutting down the streaming process
>> gracefully without loss of data that impacts consumers. The other method
>> implies inflight changes as suggested by the topic with zeio interruptions.
>> Interestingly one of our clients requested a similar solution. As solutions
>> architect /engineering manager I should come back with few options. I am on
>> the case so to speak. There is a considerable interest in Spark Structured
>> Streaming across the board, especially in trading systems.
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 16 Feb 2023 at 04:12, hueiyuan su  wrote:
>>
>>> *Component*: Spark Structured Streaming
>>> *Level*: Advanced
>>> *Scenario*: How-to
>>>
>>> -
>>> *Problems Description*
>>> I would like to confirm could we directly apply new options of
>>> readStream/writeStream without stopping current running spark structured
>>> streaming applications? For example, if we just want to adjust throughput
>>> properties of readStream with kafka. Do we have method can just adjust it
>>> without stopping application? If you have any ideas, please let me know. I
>>> will be appreciate it and your answer.
>>>
>>>
>>> --
>>> Best Regards,
>>>
>>> Mars Su
>>> *Phone*: 0988-661-013
>>> *Email*: hueiyua...@gmail.com
>>>
>>

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-03-09 Thread Mich Talebzadeh
most probably we will require an  additional method pause()

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.streaming.StreamingQuery.html

to allow us to pause (as opposed to stop()) the streaming process and
resume after changing the parameters. The state of streaming needs to be
preserved.

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 7 Mar 2023 at 17:25, Mich Talebzadeh 
wrote:

> hm interesting proposition. I guess you mean altering one of following
> parameters in flight
>
>
>   streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> Ok, one secure way of doing it though shutting down the streaming process
> gracefully without loss of data that impacts consumers. The other method
> implies inflight changes as suggested by the topic with zeio interruptions.
> Interestingly one of our clients requested a similar solution. As solutions
> architect /engineering manager I should come back with few options. I am on
> the case so to speak. There is a considerable interest in Spark Structured
> Streaming across the board, especially in trading systems.
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 16 Feb 2023 at 04:12, hueiyuan su  wrote:
>
>> *Component*: Spark Structured Streaming
>> *Level*: Advanced
>> *Scenario*: How-to
>>
>> -
>> *Problems Description*
>> I would like to confirm could we directly apply new options of
>> readStream/writeStream without stopping current running spark structured
>> streaming applications? For example, if we just want to adjust throughput
>> properties of readStream with kafka. Do we have method can just adjust it
>> without stopping application? If you have any ideas, please let me know. I
>> will be appreciate it and your answer.
>>
>>
>> --
>> Best Regards,
>>
>> Mars Su
>> *Phone*: 0988-661-013
>> *Email*: hueiyua...@gmail.com
>>
>


Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-03-07 Thread Mich Talebzadeh
hm interesting proposition. I guess you mean altering one of following
parameters in flight


  streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

Ok, one secure way of doing it though shutting down the streaming process
gracefully without loss of data that impacts consumers. The other method
implies inflight changes as suggested by the topic with zeio interruptions.
Interestingly one of our clients requested a similar solution. As solutions
architect /engineering manager I should come back with few options. I am on
the case so to speak. There is a considerable interest in Spark Structured
Streaming across the board, especially in trading systems.

HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 16 Feb 2023 at 04:12, hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> -
> *Problems Description*
> I would like to confirm could we directly apply new options of
> readStream/writeStream without stopping current running spark structured
> streaming applications? For example, if we just want to adjust throughput
> properties of readStream with kafka. Do we have method can just adjust it
> without stopping application? If you have any ideas, please let me know. I
> will be appreciate it and your answer.
>
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


Re: [Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently and how to handle if achieve quotas of kinesis?

2023-03-06 Thread Mich Talebzadeh
Spark Structured Streaming can write to anything as long as an appropriate
API or JDBC connection exists.

I have not tried Kinesis but have you thought about how you want to write
it as a Sync?

Those quota limitations, much like quotas set by the vendors (say Google on
BigQuery writes etc) are default but can be negotiated with the vendor.to
increase it.

What facts have you established so far?

HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Mar 2023 at 04:20, hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> 
> *Problems Description*
> 1. I currently would like to use pyspark structured streaming to
> write data to kinesis. But it seems like does not have corresponding
> connector can use. I would confirm whether have another method in addition
> to this solution
> 
> 2. Because aws kinesis have quota limitation (like 1MB/s and 1000
> records/s), if spark structured streaming micro batch size too large, how
> can we handle this?
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


Re: [Spark Structured Streaming] Do spark structured streaming is support sink to AWS Kinesis currently?

2023-02-16 Thread Vikas Kumar
Doesn't directly answer your question but there are ways in scala and
pyspark - See if this helps:
https://repost.aws/questions/QUP_OJomilTO6oIgvK00VHEA/writing-data-to-kinesis-stream-from-py-spark

On Thu, Feb 16, 2023, 8:27 PM hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> 
> *Problems Description*
> I would like to implement witeStream data to AWS Kinesis with Spark
> structured Streaming, but I do not find related connector jar can be used.
> I want to check whether fully support write stream to AWS Kinesis. If you
> have any ideas, please let me know. I will be appreciate it for your answer.
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


Re: [Spark Structured Streaming] Could we apply new options of readStream/writeStream without stopping spark application (zero downtime)?

2023-02-15 Thread Jack Goodson
Hi,

There is some good documentation under here

https://docs.databricks.com/structured-streaming/query-recovery.html


Under the “recovery after change in structured streaming query” heading
that gives good general guidelines on what can be changed in a “pause” of a
stream

On Thu, 16 Feb 2023 at 5:12 PM, hueiyuan su  wrote:

> *Component*: Spark Structured Streaming
> *Level*: Advanced
> *Scenario*: How-to
>
> -
> *Problems Description*
> I would like to confirm could we directly apply new options of
> readStream/writeStream without stopping current running spark structured
> streaming applications? For example, if we just want to adjust throughput
> properties of readStream with kafka. Do we have method can just adjust it
> without stopping application? If you have any ideas, please let me know. I
> will be appreciate it and your answer.
>
>
> --
> Best Regards,
>
> Mars Su
> *Phone*: 0988-661-013
> *Email*: hueiyua...@gmail.com
>


Re: [SPARK STRUCTURED STREAMING] : Rocks DB uses off-heap usage

2022-11-30 Thread Adam Binford
We started hitting this as well, seeing 90+ GB resident memory on a 25 GB
heap executor. After a lot of manually testing fixes, I finally figured out
the root problem: https://issues.apache.org/jira/browse/SPARK-41339

Starting to work on a PR now to fix.

On Mon, Sep 12, 2022 at 10:46 AM Artemis User 
wrote:

> The off-heap memory isn't subjected to GC.  So the obvious reason is that
> your have too many states to maintain in your streaming app, and the GC
> couldn't keep up, and end up with resources but to die.  Are you using
> continues processing or microbatch in structured streaming?  You may want
> to lower your incoming data rate and/or increase your microbatch size so to
> lower the number of states to be persisted/maintained...
>
> On 9/11/22 10:59 AM, akshit marwah wrote:
>
> Hi Team,
>
> We are trying to shift from HDFS State Manager to Rocks DB State Manager,
> but while doing POC we realised it is using much more off-heap space than
> expected. Because of this, the executors get killed with  : *out of**
> physical memory exception.*
>
> Could you please help in understanding, why is there a massive increase in
> off-heap space, and what can we do about it?
>
> We are using, SPARK 3.2.1 with 1 executor and 1 executor core, to
> understand the memory requirements -
> 1. Rocks DB Run - took 3.5 GB heap and 11.5 GB Res Memory
> 2. Hdfs State Manager - took 5 GB heap and 10 GB Res Memory.
>
> Thanks,
> Akshit
>
>
> Thanks and regards
> - Akshit Marwah
>
>
>

-- 
Adam Binford


Re: Spark Structured Streaming - stderr getting filled up

2022-09-19 Thread karan alang
here is the stackoverflow link

https://stackoverflow.com/questions/73780259/spark-structured-streaming-stderr-getting-filled-up

On Mon, Sep 19, 2022 at 4:41 PM karan alang  wrote:

> I've created a stackoverflow ticket for this as well
>
> On Mon, Sep 19, 2022 at 4:37 PM karan alang  wrote:
>
>> Hello All,
>> I've a Spark Structured Streaming job on GCP Dataproc - which picks up
>> data from Kafka, does processing and pushes data back into kafka topics.
>>
>> Couple of questions :
>> 1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
>> What I notice is that stdout is empty, while all the logging is put in to
>> stderr
>>
>> 2. Is there a way for me to expire the data in stderr (i.e. expire the
>> older logs) ?
>> Since I've a long running streaming job, the stderr gets filled up over
>> time and nodes/VMs become unavailable.
>>
>> Pls advice.
>>
>> Here is output of the yarn logs command :
>> ```
>>
>> root@versa-structured-stream-v1-w-1:/home/karanalang# yarn logs
>> -applicationId application_1663623368960_0008 -log_files stderr -size -500
>>
>> 2022-09-19 23:26:01,439 INFO client.RMProxy: Connecting to
>> ResourceManager at versa-structured-stream-v1-m/10.142.0.62:8032
>>
>> 2022-09-19 23:26:01,696 INFO client.AHSProxy: Connecting to Application
>> History server at versa-structured-stream-v1-m/10.142.0.62:10200
>>
>> Can not find any log file matching the pattern: [stderr] for the
>> container: container_e01_1663623368960_0008_01_03 within the
>> application: application_1663623368960_0008
>>
>> Container: container_e01_1663623368960_0008_01_02 on
>> versa-structured-stream-v1-w-2.c.versa-sml-googl.internal:8026
>>
>> LogAggregationType: LOCAL
>>
>>
>> ===
>>
>> LogType:stderr
>>
>> LogLastModifiedTime:Mon Sep 19 23:26:02 + 2022
>>
>> LogLength:44309782124
>>
>> LogContents:
>>
>> , tenantId=3, vsnId=0, mstatsTotSentOctets=48210,
>> mstatsTotRecvdOctets=242351, mstatsTotSessDuration=30,
>> mstatsTotSessCount=34, mstatsType=dest-stats, destIp=165.225.216.24,
>> mstatsAttribs=,topic=syslog.ueba-us4.v1.versa.demo3,customer=versa  type(row)
>> is ->  
>>
>> 22/09/19 23:26:02 WARN
>> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer: KafkaDataConsumer
>> is not running in UninterruptibleThread. It may hang when
>> KafkaDataConsumer's methods are interrupted because of KAFKA-1894
>>
>> End of LogType:stderr.This log file belongs to a running container
>> (container_e01_1663623368960_0008_01_02) and so may not be complete.
>>
>> ***
>>
>>
>>
>> Container: container_e01_1663623368960_0008_01_01 on
>> versa-structured-stream-v1-w-1.c.versa-sml-googl.internal:8026
>>
>> LogAggregationType: LOCAL
>>
>>
>> ===
>>
>> LogType:stderr
>>
>> LogLastModifiedTime:Mon Sep 19 22:54:55 + 2022
>>
>> LogLength:17367929
>>
>> LogContents:
>>
>> on syslog.ueba-us4.v1.versa.demo3-2
>>
>> 22/09/19 22:52:52 INFO
>> org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer
>> clientId=consumer-spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor-1,
>> groupId=spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor]
>> Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset
>> 449568676.
>>
>> 22/09/19 22:54:55 ERROR
>> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>>
>> End of LogType:stderr.
>>
>> ***
>>
>> ```
>>
>>
>>


Re: Spark Structured Streaming - stderr getting filled up

2022-09-19 Thread karan alang
I've created a stackoverflow ticket for this as well

On Mon, Sep 19, 2022 at 4:37 PM karan alang  wrote:

> Hello All,
> I've a Spark Structured Streaming job on GCP Dataproc - which picks up
> data from Kafka, does processing and pushes data back into kafka topics.
>
> Couple of questions :
> 1. Does Spark put all the log (incl. INFO, WARN etc) into stderr ?
> What I notice is that stdout is empty, while all the logging is put in to
> stderr
>
> 2. Is there a way for me to expire the data in stderr (i.e. expire the
> older logs) ?
> Since I've a long running streaming job, the stderr gets filled up over
> time and nodes/VMs become unavailable.
>
> Pls advice.
>
> Here is output of the yarn logs command :
> ```
>
> root@versa-structured-stream-v1-w-1:/home/karanalang# yarn logs
> -applicationId application_1663623368960_0008 -log_files stderr -size -500
>
> 2022-09-19 23:26:01,439 INFO client.RMProxy: Connecting to ResourceManager
> at versa-structured-stream-v1-m/10.142.0.62:8032
>
> 2022-09-19 23:26:01,696 INFO client.AHSProxy: Connecting to Application
> History server at versa-structured-stream-v1-m/10.142.0.62:10200
>
> Can not find any log file matching the pattern: [stderr] for the
> container: container_e01_1663623368960_0008_01_03 within the
> application: application_1663623368960_0008
>
> Container: container_e01_1663623368960_0008_01_02 on
> versa-structured-stream-v1-w-2.c.versa-sml-googl.internal:8026
>
> LogAggregationType: LOCAL
>
>
> ===
>
> LogType:stderr
>
> LogLastModifiedTime:Mon Sep 19 23:26:02 + 2022
>
> LogLength:44309782124
>
> LogContents:
>
> , tenantId=3, vsnId=0, mstatsTotSentOctets=48210,
> mstatsTotRecvdOctets=242351, mstatsTotSessDuration=30,
> mstatsTotSessCount=34, mstatsType=dest-stats, destIp=165.225.216.24,
> mstatsAttribs=,topic=syslog.ueba-us4.v1.versa.demo3,customer=versa  type(row)
> is ->  
>
> 22/09/19 23:26:02 WARN
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer: KafkaDataConsumer
> is not running in UninterruptibleThread. It may hang when
> KafkaDataConsumer's methods are interrupted because of KAFKA-1894
>
> End of LogType:stderr.This log file belongs to a running container
> (container_e01_1663623368960_0008_01_02) and so may not be complete.
>
> ***
>
>
>
> Container: container_e01_1663623368960_0008_01_01 on
> versa-structured-stream-v1-w-1.c.versa-sml-googl.internal:8026
>
> LogAggregationType: LOCAL
>
>
> ===
>
> LogType:stderr
>
> LogLastModifiedTime:Mon Sep 19 22:54:55 + 2022
>
> LogLength:17367929
>
> LogContents:
>
> on syslog.ueba-us4.v1.versa.demo3-2
>
> 22/09/19 22:52:52 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer
> clientId=consumer-spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor-1,
> groupId=spark-kafka-source-0f984ad9-f663-4ce1-9ef1-349419f3e6ec-1714963016-executor]
> Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset
> 449568676.
>
> 22/09/19 22:54:55 ERROR
> org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
>
> End of LogType:stderr.
>
> ***
>
> ```
>
>
>


Re: [SPARK STRUCTURED STREAMING] : Rocks DB uses off-heap usage

2022-09-12 Thread Artemis User
The off-heap memory isn't subjected to GC.  So the obvious reason is 
that your have too many states to maintain in your streaming app, and 
the GC couldn't keep up, and end up with resources but to die. Are you 
using continues processing or microbatch in structured streaming?  You 
may want to lower your incoming data rate and/or increase your 
microbatch size so to lower the number of states to be 
persisted/maintained...


On 9/11/22 10:59 AM, akshit marwah wrote:

Hi Team,

We are trying to shift from HDFS State Manager to Rocks DB State 
Manager, but while doing POC we realised it is using much more 
off-heap space than expected. Because of this, the executors get 
killed with  : *out of**physical memory exception.*

*
*
Could you please help in understanding, why is there a massive 
increase in off-heap space, and what can we do about it?


We are using, SPARK 3.2.1 with 1 executor and 1 executor core, to 
understand the memory requirements -

1. Rocks DB Run - took 3.5 GB heap and 11.5 GB Res Memory
2. Hdfs State Manager - took 5 GB heap and 10 GB Res Memory.

Thanks,
Akshit


Thanks and regards
- Akshit Marwah


Re: Spark Structured Streaming -- Cannot consume next messages

2022-07-21 Thread KhajaAsmath Mohammed
I was able to figure it out . Hdfs directory where the data is being pushed was 
run previously with different user. Not having proper permissions resulted in 
this issue 

Thanks,
Asmath

> On Jul 21, 2022, at 4:22 PM, Artemis User  wrote:
> 
> Not sure what you mean by offerts/offsets.  I assume you were using 
> file-based instead of Kafka-based of data sources.  Are the incoming data 
> generated in mini-batch files or in a single large file?  Have you had this 
> type of problem before?
> 
>> On 7/21/22 1:02 PM, KhajaAsmath Mohammed wrote:
>> Hi,
>> 
>> I am seeing weird behavior in our spark structured streaming application 
>> where the offerts are not getting picked by the streaming  job.
>> 
>> If I delete the checkpoint directory and run the job again, I can see the 
>> data for the first batch but it is not picking up new offsets again from the 
>> next job when the job is running.
>> 
>> FYI, job is still running but it is not picking up new offsets. I am not 
>> able to figure out where the issue is in this case.
>> 
>> Thanks,
>> Asmath
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming -- Cannot consume next messages

2022-07-21 Thread Artemis User
Not sure what you mean by offerts/offsets.  I assume you were using 
file-based instead of Kafka-based of data sources.  Are the incoming 
data generated in mini-batch files or in a single large file?  Have you 
had this type of problem before?


On 7/21/22 1:02 PM, KhajaAsmath Mohammed wrote:

Hi,

I am seeing weird behavior in our spark structured streaming 
application where the offerts are not getting picked by the streaming  
job.


If I delete the checkpoint directory and run the job again, I can see 
the data for the first batch but it is not picking up new offsets 
again from the next job when the job is running.


FYI, job is still running but it is not picking up new offsets. I am 
not able to figure out where the issue is in this case.


Thanks,
Asmath



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName

2021-10-12 Thread Alchemist
 Looks like somehow related to API unable to send data from executor to driver
If I set spark master to local I get these 6 files
When spark.master is local&  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/d&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/a&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/b&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/g&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/e&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/f&  
InputReportAndFileName fileName 
If I set spark master local[*] I get these file...&  
InputReportAndFileName fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/b&  InputReportAndFileName 
fileName file:///Users/abc/Desktop/test/Streaming/e&  
InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/d&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName &  
InputReportAndFileName fileName 

On Tuesday, October 12, 2021, 05:23:45 AM PDT, Alchemist 
 wrote:  
 
  Here is Spark's API definition, unable to understand what does it mean to 
have "unknown" file.  We are processing file we will have fileName I have 7 
files it can print 3 and miss other 4
    /**       * Returns the holding file name or empty string if it is unknown. 
   */        def getInputFilePath: UTF8String = inputBlock.get().get().filePath
Can anyone help me understand what does it mean by file name unknown hence 
above API returning blank filename below.
On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist 
 wrote:  
 
 Hello all,
I am trying to extract file name like following but intermittently we are 
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read() 
.option("multiLine", true) .json("src/main/resources/sample.json") 
.schema();Step2: Get Input DataSetDataset inputDS = sparkSession 
.readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) 
.json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= 
inputDS.select(functions.col("Report")).toJSON()  .withColumn("FileName", 
org.apache.spark.sql.functions.input_file_name());Step4: Print 
fileNameDataset inputDF = inputDf 
.as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{  
System.out.println("&  InputReportAndFileName fileName " + 
inputReportAndFileName.getFileName()); return inputReportAndFileName;}, 
ExpressionEncoder.javaBean(InputReportAndFileName.class));
Output: Here we see missing fileName&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&
  InputReportAndFileName fileName &  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 


Re: Spark Structured Streaming org.apache.spark.sql.functions.input_file_name Intermittently Missing FileName

2021-10-12 Thread Alchemist
 Here is Spark's API definition, unable to understand what does it mean to have 
"unknown" file.  We are processing file we will have fileName I have 7 
files it can print 3 and miss other 4
    /**       * Returns the holding file name or empty string if it is unknown. 
   */        def getInputFilePath: UTF8String = inputBlock.get().get().filePath
Can anyone help me understand what does it mean by file name unknown hence 
above API returning blank filename below.
On Monday, October 11, 2021, 08:43:42 PM PDT, Alchemist 
 wrote:  
 
 Hello all,
I am trying to extract file name like following but intermittently we are 
getting empty file name.
Step 1: Get SchemaStructType jsonSchema = sparkSession.read() 
.option("multiLine", true) .json("src/main/resources/sample.json") 
.schema();Step2: Get Input DataSetDataset inputDS = sparkSession 
.readStream() .format("text") .option("multiLine", true) .schema(jsonSchema) 
.json(inputPath + "/*");Step3: Add fileName columnDataset inputDf= 
inputDS.select(functions.col("Report")).toJSON()  .withColumn("FileName", 
org.apache.spark.sql.functions.input_file_name());Step4: Print 
fileNameDataset inputDF = inputDf 
.as(ExpressionEncoder.javaBean(InputReportAndFileName.class)).map((MapFunction) inputReportAndFileName ->{  
System.out.println("&  InputReportAndFileName fileName " + 
inputReportAndFileName.getFileName()); return inputReportAndFileName;}, 
ExpressionEncoder.javaBean(InputReportAndFileName.class));
Output: Here we see missing fileName&  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_1420254%202&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_14202040&
  InputReportAndFileName fileName 
file:///Users/abc/Desktop/test/Streaming/2021-Aug-14-042000_001E46_142720%202&
  InputReportAndFileName fileName &  InputReportAndFileName 
fileName &  InputReportAndFileName fileName 
  

Re: Spark Structured Streaming Continuous Trigger on multiple sinks

2021-09-12 Thread Alex Ott
Just don't call .awaitTermindation() because it blocks execution of the
next line of code. You can assign result of .start() to a specific
variable, or put them into list/array.

And to wait until one of the streams finishes, use
spark.streams.awaitAnyTermination() or something like this
(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries)
 

S  at "Wed, 25 Aug 2021 14:14:48 +0530" wrote:
 S> Hello,

 S> I have a structured streaming job that needs to be able to write to 
multiple sinks. We are using Continuous Trigger and not Microbatch Trigger. 

 S> 1. When we use the foreach method using:
 S> dataset1.writeStream.foreach(kafka ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> dataset1.writeStream.foreach(mongo ForEachWriter 
logic).trigger(ContinuousMode).start().awaitTermination() 
 S> The first statement blocks the second one for obvious reasons. So this does 
not serve our purpose.
 S> 2. The next step for this problem would be to use the foreachbatch. That is 
not supported in the ContinuousMode.
 S> 3. The next step was to use something like this 
 S> 
dataset1.writeStream.format("kafka").trigger(ContinuousMode).start().awaitTermination()
 
 S> 
dataset1.writeStream.format("mongo").trigger(ContinuousMode).start().awaitTermination()
 S> for both the sinks. This does not work either. Only the 1st query works. 
The second one does not.

 S> Is there any solution to the problem of being able to write to multiple 
sinks in Continuous Trigger Mode using Structured Streaming?



-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread ayan guha
Hi

Option 1: You can write back to processed queue and add some additional
info like last time tried and some counter.

Option 2: Load unpaired transactiona in a staging table in postgres. Modify
streaming job of "created" flow to try to pair any unpaired trx and clear
it by moving to main table. You may need to create a batch job to delete
records from the staging table to remove unpaired records which pass a
certain age.

Ayan

On Sat, 10 Jul 2021 at 9:49 am, Mich Talebzadeh 
wrote:

> One alternative I can think of is that you publish your orphaned
> transactions to another topic from the main Spark job
>
> You create a new DF based on orphaned transactions
>
> result = orphanedDF \
> ..
> .writeStream \
>  .outputMode('complete') \
>  .format("kafka") \
>  .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
>  .option("topic", "orphaned") \
>  .option('checkpointLocation', checkpoint_path) \
>  .queryName("orphanedTransactions") \
>  .start()
>
>
> And consume it somewhere else
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira 
> wrote:
>
>> I mean... I guess?
>>
>> But I don't really have Airflow here, and I didn't really wanted to fall
>> back to a "batch"-kinda approach with Airflow
>>
>> I'd rather use a Dead Letter Queue approach instead (like I mentioned
>> another topic for the failed ones, which is later consumed and pumps
>> the messages back to the original topic),
>> or something with Spark+Delta Lake instead...
>>
>> I was just hoping I could somewhat just retry/replay these "orphaned"
>> transactions somewhat easier...
>>
>> *Question) *Those features of "Stateful Streaming" or "Continuous
>> Processing" mode wouldn't help solve my case, would they?
>>
>> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
>> wrote:
>>
>>> Well this is a matter of using journal entries.
>>>
>>> What you can do is that those "orphaned" transactions that you cannot
>>> pair through transaction_id can be written to a journal table in your
>>> Postgres DB. Then you can pair them with the entries in the relevant
>>> Postgres table. If the essence is not time critical this can be done
>>> through a scheduling job every x minutes through airflow or something
>>> similar on the database alone.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira 
>>> wrote:
>>>
 That is exactly the case, Sebastian!

 - In practise, that "created  means "*authorized*", but I still cannot
 deduct anything from the customer balance
 - the "processed" means I can safely deduct the transaction_amount
 from the customer balance,
 - and the "refunded" means I must give the transaction amount back to
 the customer balance

 So, technically, we cannot process something that is not "AUTHORIZED"
 (created) yet, nor can we process a refund for a transaction that has NOT
 been PROCESSED yet.


 *You have an authorisation, then the actual transaction and maybe a
> refund some time in the future. You want to proceed with a transaction 
> only
> if you've seen the auth but in an eventually consistent system this might
> not always happen.*


 That's absolutely the case! So, yes, That's correct.

 *You are asking in the case of receiving the transaction before the
> auth how to retry later? *


 Yeah! I'm struggling for days on how to solve with Spark Structured
 Streaming...

 *Right now you are discarding those transactions that didn't match so
> you instead would need to persist them somewhere and either reinject them
> into the job that does lookup (say after x minutes) *



 *Right now, the best I could think of is: *

- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
Kafka (topic "transactions-processed")
- Then 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
One alternative I can think of is that you publish your orphaned
transactions to another topic from the main Spark job

You create a new DF based on orphaned transactions

result = orphanedDF \
..
.writeStream \
 .outputMode('complete') \
 .format("kafka") \
 .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
 .option("topic", "orphaned") \
 .option('checkpointLocation', checkpoint_path) \
 .queryName("orphanedTransactions") \
 .start()


And consume it somewhere else


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 10 Jul 2021 at 00:36, Bruno Oliveira  wrote:

> I mean... I guess?
>
> But I don't really have Airflow here, and I didn't really wanted to fall
> back to a "batch"-kinda approach with Airflow
>
> I'd rather use a Dead Letter Queue approach instead (like I mentioned
> another topic for the failed ones, which is later consumed and pumps
> the messages back to the original topic),
> or something with Spark+Delta Lake instead...
>
> I was just hoping I could somewhat just retry/replay these "orphaned"
> transactions somewhat easier...
>
> *Question) *Those features of "Stateful Streaming" or "Continuous
> Processing" mode wouldn't help solve my case, would they?
>
> On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
> wrote:
>
>> Well this is a matter of using journal entries.
>>
>> What you can do is that those "orphaned" transactions that you cannot
>> pair through transaction_id can be written to a journal table in your
>> Postgres DB. Then you can pair them with the entries in the relevant
>> Postgres table. If the essence is not time critical this can be done
>> through a scheduling job every x minutes through airflow or something
>> similar on the database alone.
>>
>> HTH
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira 
>> wrote:
>>
>>> That is exactly the case, Sebastian!
>>>
>>> - In practise, that "created  means "*authorized*", but I still cannot
>>> deduct anything from the customer balance
>>> - the "processed" means I can safely deduct the transaction_amount  from
>>> the customer balance,
>>> - and the "refunded" means I must give the transaction amount back to
>>> the customer balance
>>>
>>> So, technically, we cannot process something that is not "AUTHORIZED"
>>> (created) yet, nor can we process a refund for a transaction that has NOT
>>> been PROCESSED yet.
>>>
>>>
>>> *You have an authorisation, then the actual transaction and maybe a
 refund some time in the future. You want to proceed with a transaction only
 if you've seen the auth but in an eventually consistent system this might
 not always happen.*
>>>
>>>
>>> That's absolutely the case! So, yes, That's correct.
>>>
>>> *You are asking in the case of receiving the transaction before the auth
 how to retry later? *
>>>
>>>
>>> Yeah! I'm struggling for days on how to solve with Spark Structured
>>> Streaming...
>>>
>>> *Right now you are discarding those transactions that didn't match so
 you instead would need to persist them somewhere and either reinject them
 into the job that does lookup (say after x minutes) *
>>>
>>>
>>>
>>> *Right now, the best I could think of is: *
>>>
>>>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>>Kafka (topic "transactions-processed")
>>>- Then I'm querying the database for these IDs that have the status
>>>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>>transactions for IDs [1, 2]
>>>- So, while it'll work for the ones with ID [1. 2] , I would have to
>>>put that transaction_id 3 in another topic, say, "
>>>*transaction-processed-retry*"
>>>- And write yet another consumer, to fetch the messages from that 
>>> "*transaction-processed-retry"
>>>*and put them back to the original topic (transactions-processed)
>>>- And do something similar for the transactions-refunded
>>>
>>> *Q1) *I think this approach may work, but I can't stop th

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
I mean... I guess?

But I don't really have Airflow here, and I didn't really wanted to fall
back to a "batch"-kinda approach with Airflow

I'd rather use a Dead Letter Queue approach instead (like I mentioned
another topic for the failed ones, which is later consumed and pumps
the messages back to the original topic),
or something with Spark+Delta Lake instead...

I was just hoping I could somewhat just retry/replay these "orphaned"
transactions somewhat easier...

*Question) *Those features of "Stateful Streaming" or "Continuous
Processing" mode wouldn't help solve my case, would they?

On Fri, Jul 9, 2021 at 8:19 PM Mich Talebzadeh 
wrote:

> Well this is a matter of using journal entries.
>
> What you can do is that those "orphaned" transactions that you cannot pair
> through transaction_id can be written to a journal table in your Postgres
> DB. Then you can pair them with the entries in the relevant Postgres table.
> If the essence is not time critical this can be done through a scheduling
> job every x minutes through airflow or something similar on the database
> alone.
>
> HTH
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira  wrote:
>
>> That is exactly the case, Sebastian!
>>
>> - In practise, that "created  means "*authorized*", but I still cannot
>> deduct anything from the customer balance
>> - the "processed" means I can safely deduct the transaction_amount  from
>> the customer balance,
>> - and the "refunded" means I must give the transaction amount back to the
>> customer balance
>>
>> So, technically, we cannot process something that is not "AUTHORIZED"
>> (created) yet, nor can we process a refund for a transaction that has NOT
>> been PROCESSED yet.
>>
>>
>> *You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.*
>>
>>
>> That's absolutely the case! So, yes, That's correct.
>>
>> *You are asking in the case of receiving the transaction before the auth
>>> how to retry later? *
>>
>>
>> Yeah! I'm struggling for days on how to solve with Spark Structured
>> Streaming...
>>
>> *Right now you are discarding those transactions that didn't match so you
>>> instead would need to persist them somewhere and either reinject them into
>>> the job that does lookup (say after x minutes) *
>>
>>
>>
>> *Right now, the best I could think of is: *
>>
>>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from
>>Kafka (topic "transactions-processed")
>>- Then I'm querying the database for these IDs that have the status
>>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>>transactions for IDs [1, 2]
>>- So, while it'll work for the ones with ID [1. 2] , I would have to
>>put that transaction_id 3 in another topic, say, "
>>*transaction-processed-retry*"
>>- And write yet another consumer, to fetch the messages from that 
>> "*transaction-processed-retry"
>>*and put them back to the original topic (transactions-processed)
>>- And do something similar for the transactions-refunded
>>
>> *Q1) *I think this approach may work, but I can't stop thinking I'm
>> overengineering this, and was wondering if there isn't a better approach...
>> ?
>>
>> *Is this what you are looking for?*
>>
>>
>> Yes, that's exactly it.
>>
>>
>> *Q2)* I know that, under the hood, Structured Streaming is actually
>> using the micro-batch engine,
>>  if I switched to *Continuous Processing*, would it make any
>> difference? Would it allow me any "retry" mechanism out of the box?
>>
>> *Q3)* I stumbled upon a *Stateful Streaming* (
>> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
>> , but I have never ever used it before,
>> would that actually do something for my case (retrying/replaying
>> a given message) ?
>>
>>
>> Thank you very VERY in advance!
>> Best regards
>>
>>
>> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
>> wrote:
>>
>>> So in payment systems you have something similar I think
>>>
>>> You have an authorisation, then the actual transaction and maybe a
>>> refund some time in the future. You want to proceed with a transaction only
>>> if you've seen the auth but in an eventually consistent system this might
>>> not always happen.
>>>
>>> You are asking in the case of receiving the transaction before the auth
>>> how to retry later?
>>>
>>> Right now you are

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Well this is a matter of using journal entries.

What you can do is that those "orphaned" transactions that you cannot pair
through transaction_id can be written to a journal table in your Postgres
DB. Then you can pair them with the entries in the relevant Postgres table.
If the essence is not time critical this can be done through a scheduling
job every x minutes through airflow or something similar on the database
alone.

HTH




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 23:53, Bruno Oliveira  wrote:

> That is exactly the case, Sebastian!
>
> - In practise, that "created  means "*authorized*", but I still cannot
> deduct anything from the customer balance
> - the "processed" means I can safely deduct the transaction_amount  from
> the customer balance,
> - and the "refunded" means I must give the transaction amount back to the
> customer balance
>
> So, technically, we cannot process something that is not "AUTHORIZED"
> (created) yet, nor can we process a refund for a transaction that has NOT
> been PROCESSED yet.
>
>
> *You have an authorisation, then the actual transaction and maybe a refund
>> some time in the future. You want to proceed with a transaction only if
>> you've seen the auth but in an eventually consistent system this might not
>> always happen.*
>
>
> That's absolutely the case! So, yes, That's correct.
>
> *You are asking in the case of receiving the transaction before the auth
>> how to retry later? *
>
>
> Yeah! I'm struggling for days on how to solve with Spark Structured
> Streaming...
>
> *Right now you are discarding those transactions that didn't match so you
>> instead would need to persist them somewhere and either reinject them into
>> the job that does lookup (say after x minutes) *
>
>
>
> *Right now, the best I could think of is: *
>
>- Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka
>(topic "transactions-processed")
>- Then I'm querying the database for these IDs that have the status
>"CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
>transactions for IDs [1, 2]
>- So, while it'll work for the ones with ID [1. 2] , I would have to
>put that transaction_id 3 in another topic, say, "
>*transaction-processed-retry*"
>- And write yet another consumer, to fetch the messages from that 
> "*transaction-processed-retry"
>*and put them back to the original topic (transactions-processed)
>- And do something similar for the transactions-refunded
>
> *Q1) *I think this approach may work, but I can't stop thinking I'm
> overengineering this, and was wondering if there isn't a better approach...
> ?
>
> *Is this what you are looking for?*
>
>
> Yes, that's exactly it.
>
>
> *Q2)* I know that, under the hood, Structured Streaming is actually using
> the micro-batch engine,
>  if I switched to *Continuous Processing*, would it make any
> difference? Would it allow me any "retry" mechanism out of the box?
>
> *Q3)* I stumbled upon a *Stateful Streaming* (
> https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
> , but I have never ever used it before,
> would that actually do something for my case (retrying/replaying a
> given message) ?
>
>
> Thank you very VERY in advance!
> Best regards
>
>
> On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
> wrote:
>
>> So in payment systems you have something similar I think
>>
>> You have an authorisation, then the actual transaction and maybe a refund
>> some time in the future. You want to proceed with a transaction only if
>> you've seen the auth but in an eventually consistent system this might not
>> always happen.
>>
>> You are asking in the case of receiving the transaction before the auth
>> how to retry later?
>>
>> Right now you are discarding those transactions that didn't match so you
>> instead would need to persist them somewhere and either reinject them into
>> the job that does lookup (say after x minutes)
>>
>> Is this what you are looking for?
>>
>> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira, 
>> wrote:
>>
>>> I'm terribly sorry, Mich. That was my mistake.
>>> The timestamps are not the same (I copy&pasted without realizing that,
>>> I'm really sorry for the confusion)
>>>
>>> Please assume NONE of the following transactions are in the database yet
>>>
>>> *transactions-created:*
>>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>>> 11:01:00" }
>>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>>> 08:02:00" }
>>>
>>> *transactions-processed: *
>>> { "transacti

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
That is exactly the case, Sebastian!

- In practise, that "created  means "*authorized*", but I still cannot
deduct anything from the customer balance
- the "processed" means I can safely deduct the transaction_amount  from
the customer balance,
- and the "refunded" means I must give the transaction amount back to the
customer balance

So, technically, we cannot process something that is not "AUTHORIZED"
(created) yet, nor can we process a refund for a transaction that has NOT
been PROCESSED yet.


*You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.*


That's absolutely the case! So, yes, That's correct.

*You are asking in the case of receiving the transaction before the auth
> how to retry later? *


Yeah! I'm struggling for days on how to solve with Spark Structured
Streaming...

*Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes) *



*Right now, the best I could think of is: *

   - Say, I'm reading the messages w/ transaction_id [1, 2, 3] from Kafka
   (topic "transactions-processed")
   - Then I'm querying the database for these IDs that have the status
   "CREATED" (or "AUTHORIZED" to be more accurate), and it returns the
   transactions for IDs [1, 2]
   - So, while it'll work for the ones with ID [1. 2] , I would have to put
   that transaction_id 3 in another topic, say, "
   *transaction-processed-retry*"
   - And write yet another consumer, to fetch the messages from that
"*transaction-processed-retry"
   *and put them back to the original topic (transactions-processed)
   - And do something similar for the transactions-refunded

*Q1) *I think this approach may work, but I can't stop thinking I'm
overengineering this, and was wondering if there isn't a better approach...
?

*Is this what you are looking for?*


Yes, that's exactly it.


*Q2)* I know that, under the hood, Structured Streaming is actually using
the micro-batch engine,
 if I switched to *Continuous Processing*, would it make any
difference? Would it allow me any "retry" mechanism out of the box?

*Q3)* I stumbled upon a *Stateful Streaming* (
https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
, but I have never ever used it before,
would that actually do something for my case (retrying/replaying a
given message) ?


Thank you very VERY in advance!
Best regards


On Fri, Jul 9, 2021 at 6:36 PM Sebastian Piu 
wrote:

> So in payment systems you have something similar I think
>
> You have an authorisation, then the actual transaction and maybe a refund
> some time in the future. You want to proceed with a transaction only if
> you've seen the auth but in an eventually consistent system this might not
> always happen.
>
> You are asking in the case of receiving the transaction before the auth
> how to retry later?
>
> Right now you are discarding those transactions that didn't match so you
> instead would need to persist them somewhere and either reinject them into
> the job that does lookup (say after x minutes)
>
> Is this what you are looking for?
>
> On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira,  wrote:
>
>> I'm terribly sorry, Mich. That was my mistake.
>> The timestamps are not the same (I copy&pasted without realizing that,
>> I'm really sorry for the confusion)
>>
>> Please assume NONE of the following transactions are in the database yet
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 08:02:00" }
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so
>> it's processed 2 minutes after it was created
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so
>> it's processed 4 hours after it was created
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot
>> be persisted into the DB yet, because this "transaction_id 3" with the
>> status "CREATED" does NOT exist in the DB
>>
>>
>> *(...) Transactions-created are created at the same time (the same
>>> timestamp) but you have NOT received them and they don't yet exist in your
>>> DB (...)*
>>
>> - Not at the same timestamp, that was my mistake.
>> - Imagine two transactions with the same ID (neither of them are in any
>> Kafka topic yet),
>>
>>- One with the status CREATED, and another with the status PROCESSED,
>>- The one with the status PROCESSED will ALWAYS have a higher/greater
>>timestamp than the one with the status CREATED
>>- Now for whatever reason, this happens:
>>   - Step a) some producer *fails* to push the *created* one to the
>>  

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Sebastian Piu
So in payment systems you have something similar I think

You have an authorisation, then the actual transaction and maybe a refund
some time in the future. You want to proceed with a transaction only if
you've seen the auth but in an eventually consistent system this might not
always happen.

You are asking in the case of receiving the transaction before the auth how
to retry later?

Right now you are discarding those transactions that didn't match so you
instead would need to persist them somewhere and either reinject them into
the job that does lookup (say after x minutes)

Is this what you are looking for?

On Fri, 9 Jul 2021, 9:44 pm Bruno Oliveira,  wrote:

> I'm terribly sorry, Mich. That was my mistake.
> The timestamps are not the same (I copy&pasted without realizing that, I'm
> really sorry for the confusion)
>
> Please assume NONE of the following transactions are in the database yet
>
> *transactions-created:*
> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
> }
> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
> 08:02:00" }
>
> *transactions-processed: *
> { "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so it's
> processed 2 minutes after it was created
> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so it's
> processed 4 hours after it was created
> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot
> be persisted into the DB yet, because this "transaction_id 3" with the
> status "CREATED" does NOT exist in the DB
>
>
> *(...) Transactions-created are created at the same time (the same
>> timestamp) but you have NOT received them and they don't yet exist in your
>> DB (...)*
>
> - Not at the same timestamp, that was my mistake.
> - Imagine two transactions with the same ID (neither of them are in any
> Kafka topic yet),
>
>- One with the status CREATED, and another with the status PROCESSED,
>- The one with the status PROCESSED will ALWAYS have a higher/greater
>timestamp than the one with the status CREATED
>- Now for whatever reason, this happens:
>   - Step a) some producer *fails* to push the *created* one to the
>   topic  *transactions-created, it will RETRY, and will eventually
>   succeed, but that can take minutes, or hours*
>   - Step b) however, the producer *succeeds* in pushing the*
>   'processed' *one to the topic *transactions-processed *
>
>
> *(...) because presumably your relational database is too slow to ingest
>> them? (...)*
>
>
> - it's not like the DB was slow, it was because the message for
> transaction_id 3 didn't arrive at the *topic-created *yet, due to some
> error/failure in Step A, for example
>
>
> * you do a query in Postgres for say transaction_id 3 but they don't exist
>> yet? When are they expected to arrive?*
>
>
> - That's correct. It could take minutes, maybe hours. But it is guaranteed
> that at some point, in the future, they will arrive. I just have to keep
> trying until it works, this transaction_id 3 with the status CREATED
> arrives at the database
>
>
> Huge apologies for the confusion... Is it a bit more clear now?
>
> *PS:* This is a simplified scenario, in practise, there is yet another
> topic for "transactions-refunded". But which cannot be sinked to the DB,
> unless the same transaction_id with the status "PROCESSED" is there. (but
> again, there can only be a transaction_id PROCESSED, if the same
> transaction_id with CREATED exists in the DB)
>
>
> On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh 
> wrote:
>
>> One second
>>
>> The topic called transactions_processed is streaming through Spark.
>> Transactions-created are created at the same time (the same timestamp) but
>> you have NOT received them and they don't yet exist in your DB,
>> because presumably your relational database is too slow to ingest them? you
>> do a query in Postgres for say transaction_id 3 but they don't exist yet?
>> When are they expected to arrive?
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira 
>> wrote:
>>
>>> Thanks for the quick reply!
>>>
>>> I'm not sure I got the idea correctly... but from what I'm underding,
>>> wouldn't that actually end the same way?
>>> Because, this is the current scenario:
>>>
>>> *transactions-processed: *
>>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>>> { "transaction_id": 4, "times

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
I'm terribly sorry, Mich. That was my mistake.
The timestamps are not the same (I copy&pasted without realizing that, I'm
really sorry for the confusion)

Please assume NONE of the following transactions are in the database yet

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
08:02:00" }

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:03:00" } // so it's
processed 2 minutes after it was created
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" } // so it's
processed 4 hours after it was created
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }// cannot be
persisted into the DB yet, because this "transaction_id 3" with the status
"CREATED" does NOT exist in the DB


*(...) Transactions-created are created at the same time (the same
> timestamp) but you have NOT received them and they don't yet exist in your
> DB (...)*

- Not at the same timestamp, that was my mistake.
- Imagine two transactions with the same ID (neither of them are in any
Kafka topic yet),

   - One with the status CREATED, and another with the status PROCESSED,
   - The one with the status PROCESSED will ALWAYS have a higher/greater
   timestamp than the one with the status CREATED
   - Now for whatever reason, this happens:
  - Step a) some producer *fails* to push the *created* one to the
  topic  *transactions-created, it will RETRY, and will eventually
  succeed, but that can take minutes, or hours*
  - Step b) however, the producer *succeeds* in pushing the*
  'processed' *one to the topic *transactions-processed *


*(...) because presumably your relational database is too slow to ingest
> them? (...)*


- it's not like the DB was slow, it was because the message for
transaction_id 3 didn't arrive at the *topic-created *yet, due to some
error/failure in Step A, for example


* you do a query in Postgres for say transaction_id 3 but they don't exist
> yet? When are they expected to arrive?*


- That's correct. It could take minutes, maybe hours. But it is guaranteed
that at some point, in the future, they will arrive. I just have to keep
trying until it works, this transaction_id 3 with the status CREATED
arrives at the database


Huge apologies for the confusion... Is it a bit more clear now?

*PS:* This is a simplified scenario, in practise, there is yet another
topic for "transactions-refunded". But which cannot be sinked to the DB,
unless the same transaction_id with the status "PROCESSED" is there. (but
again, there can only be a transaction_id PROCESSED, if the same
transaction_id with CREATED exists in the DB)


On Fri, Jul 9, 2021 at 4:51 PM Mich Talebzadeh 
wrote:

> One second
>
> The topic called transactions_processed is streaming through Spark.
> Transactions-created are created at the same time (the same timestamp) but
> you have NOT received them and they don't yet exist in your DB,
> because presumably your relational database is too slow to ingest them? you
> do a query in Postgres for say transaction_id 3 but they don't exist yet?
> When are they expected to arrive?
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira  wrote:
>
>> Thanks for the quick reply!
>>
>> I'm not sure I got the idea correctly... but from what I'm underding,
>> wouldn't that actually end the same way?
>> Because, this is the current scenario:
>>
>> *transactions-processed: *
>> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
>> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
>> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
>> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>>
>> *transactions-created:*
>> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04
>> 11:01:00" }
>> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
>> 12:02:00" }
>>
>> - So, when I fetch ALL messages from both topics, there are still 2x
>> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
>> "transaction-created" yet (and they aren't in Postgres either)
>> - But since they were pulled by "Structured Streaming" already, they'll
>> be kinda marked as "processed" by Spark Structure Streaming checkpoint
>> anyway.
>>
>> And therefore, I can't replay/reprocess them again...
>>
>> Is my understanding correct? Am I missing something here?
>>
>> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
>> wrote:
>>
>>> Thanks for the details.
>>>
>>> Can yo

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
One second

The topic called transactions_processed is streaming through Spark.
Transactions-created are created at the same time (the same timestamp) but
you have NOT received them and they don't yet exist in your DB,
because presumably your relational database is too slow to ingest them? you
do a query in Postgres for say transaction_id 3 but they don't exist yet?
When are they expected to arrive?




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 19:12, Bruno Oliveira  wrote:

> Thanks for the quick reply!
>
> I'm not sure I got the idea correctly... but from what I'm underding,
> wouldn't that actually end the same way?
> Because, this is the current scenario:
>
> *transactions-processed: *
> { "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
> { "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
> { "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
> { "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }
>
> *transactions-created:*
> { "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
> }
> { "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
> 12:02:00" }
>
> - So, when I fetch ALL messages from both topics, there are still 2x
> transactions (id: "*3*" and "*4*") which do *not* exist in the topic
> "transaction-created" yet (and they aren't in Postgres either)
> - But since they were pulled by "Structured Streaming" already, they'll be
> kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.
>
> And therefore, I can't replay/reprocess them again...
>
> Is my understanding correct? Am I missing something here?
>
> On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
> wrote:
>
>> Thanks for the details.
>>
>> Can you read these in the same app. For example. This is PySpark but it
>> serves the purpose.
>>
>> Read topic "newtopic" in micro batch and the other topic "md" in another
>> microbatch
>>
>> try:
>> # process topic --> newtopic
>> streamingNewtopic = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['newtopic']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> *.option("subscribe", config['MDVariables']['newtopic'])
>> \*
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> newtopicSchema).alias("newtopic_value"))
>>
>> # construct a streaming dataframe streamingDataFrame that
>> subscribes to topic config['MDVariables']['topic']) -> md (market data)
>> streamingDataFrame = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> *.option("subscribe", config['MDVariables']['topic']) \*
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> schema).alias("parsed_value"))
>>
>>
>> streamingNewtopic

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
Thanks for the quick reply!

I'm not sure I got the idea correctly... but from what I'm underding,
wouldn't that actually end the same way?
Because, this is the current scenario:

*transactions-processed: *
{ "transaction_id": 1, "timestamp": "2020-04-04 11:01:00" }
{ "transaction_id": 2, "timestamp": "2020-04-04 12:02:00" }
{ "transaction_id": 3, "timestamp": "2020-04-04 13:03:00" }
{ "transaction_id": 4, "timestamp": "2020-04-04 14:04:00" }

*transactions-created:*
{ "transaction_id": 1, "amount":  1000, "timestamp": "2020-04-04 11:01:00"
}
{ "transaction_id": 2, "amount":  2000, "timestamp": "2020-04-04
12:02:00" }

- So, when I fetch ALL messages from both topics, there are still 2x
transactions (id: "*3*" and "*4*") which do *not* exist in the topic
"transaction-created" yet (and they aren't in Postgres either)
- But since they were pulled by "Structured Streaming" already, they'll be
kinda marked as "processed" by Spark Structure Streaming checkpoint anyway.

And therefore, I can't replay/reprocess them again...

Is my understanding correct? Am I missing something here?

On Fri, Jul 9, 2021 at 2:02 PM Mich Talebzadeh 
wrote:

> Thanks for the details.
>
> Can you read these in the same app. For example. This is PySpark but it
> serves the purpose.
>
> Read topic "newtopic" in micro batch and the other topic "md" in another
> microbatch
>
> try:
> # process topic --> newtopic
> streamingNewtopic = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['newtopic']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> *.option("subscribe", config['MDVariables']['newtopic'])
> \*
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> newtopicSchema).alias("newtopic_value"))
>
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> *.option("subscribe", config['MDVariables']['topic']) \*
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
> streamingNewtopic.printSchema()
>
> # Now do a writeStream and call the relevant functions to
> process dataframes
>
> newtopicResult = streamingNewtopic.select( \
>  col("newtopic_value.uuid").alias("uuid") \
>, col("newtopic_value.timeissued").alias("timeissued") \
>, col("newtopic_value.queue").alias("queue") \
>, col("newtopic_value.status").alias("status")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>   *   foreachBatch(sendToControl). \*
>  trigger(processingTime='2 seconds'). \
>  queryName(config['MDVariables']['newtopic']). \
>  start()
>
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker")

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Thanks for the details.

Can you read these in the same app. For example. This is PySpark but it
serves the purpose.

Read topic "newtopic" in micro batch and the other topic "md" in another
microbatch

try:
# process topic --> newtopic
streamingNewtopic = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['newtopic']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
*.option("subscribe", config['MDVariables']['newtopic']) \*
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
newtopicSchema).alias("newtopic_value"))

# construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \
.option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
.option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
.option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
.option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
*.option("subscribe", config['MDVariables']['topic']) \*
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


streamingNewtopic.printSchema()

# Now do a writeStream and call the relevant functions to
process dataframes

newtopicResult = streamingNewtopic.select( \
 col("newtopic_value.uuid").alias("uuid") \
   , col("newtopic_value.timeissued").alias("timeissued") \
   , col("newtopic_value.queue").alias("queue") \
   , col("newtopic_value.status").alias("status")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
  *   foreachBatch(sendToControl). \*
 trigger(processingTime='2 seconds'). \
 queryName(config['MDVariables']['newtopic']). \
 start()

result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 *foreachBatch(sendToSink). \*
 trigger(processingTime='30 seconds'). \
 option('checkpointLocation', checkpoint_path). \
 queryName(config['MDVariables']['topic']). \
 start()
print(result)

except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)

Inside that function say *sendToSink *you can get the df and batchId

def sendToSink(df, batchId):
if(len(df.take(1))) > 0:
print(f"""md batchId is {batchId}""")
df.show(100,False)
df. persist()
# write to BigQuery batch table
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
df.unpersist()
print(f"""wrote to DB""")
else:
print("DataFrame md is empty")

And you have created DF from the other topic newtopic

def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
 

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Bruno Oliveira
Hello! Sure thing!

I'm reading them *separately*, both are apps written with Scala + Spark
Structured Streaming.

I feel like I missed some details on my original thread (sorry it was past
4 AM) and it was getting frustrating
Please let me try to clarify some points:

*Transactions Created Consumer*
---
| Kafka trx-created-topic   |   <--- (Scala + SparkStructured Streaming)
ConsumerApp --->  Sinks to ---> Postgres DB Table (Transactions)
---

*Transactions Processed Consumer*
-
| Kafka trx-processed-topic |  <---   1) (Scala + SparkStructured
Streaming) AnotherConsumerApp fetches a Dataset (let's call it "a")
-   2) Selects the Ids
-
|   Postgres / Trx table |. <--- 3) Fetches the rows w/ the
matching ids that have status 'created (let's call it "b")
- 4)  Performs an intersection
between "a" and "b" resulting in a "b_that_needs_sinking" (but now there's
some "b_leftovers" that were out of the intersection)
 5)  Sinks
"b_that_needs_sinking" to DB, but that leaves the "b_leftovers" as
unprocessed (not persisted)
 6) However, those
"b_leftovers" would, ultimately, be processed at some point (even if it
takes like 1-3 days) - when their corresponding transaction_id are
 pushed to the
"trx-created-topic" Kafka topic, and are then processed by that first
consumer

So, what I'm trying to accomplish is find a way to reprocess those
"b_leftovers" *without *having to restart the app
Does that make sense?

PS: It doesn't necessarily have to be real streaming, if micro-batching
(legacy Spark Streaming) would allow such a thing, it would technically
work (although I keep hearing it's not advisable)

Thank you so much!

Kind regards

On Fri, Jul 9, 2021 at 12:13 PM Mich Talebzadeh 
wrote:

> Can you please clarify if you are reading these two topics separately or
> within the same scala or python script in Spark Structured Streaming?
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira  wrote:
>
>> Hello guys,
>>
>> I've been struggling with this for some days now, without success, so I
>> would highly appreciate any enlightenment. The simplified scenario is the
>> following:
>>
>>- I've got 2 topics in Kafka (it's already like that in production,
>>can't change it)
>>   - transactions-created,
>>   - transaction-processed
>>- Even though the schema is not exactly the same, they all share a
>>correlation_id, which is their "transaction_id"
>>
>> So, long story short, I've got 2 consumers, one for each topic, and all I
>> wanna do is sink them in a chain order. I'm writing them w/ Spark
>> Structured Streaming, btw
>>
>> So far so good, the caveat here is:
>>
>> - I cannot write a given "*processed" *transaction unless there is an
>> entry of that same transaction with the status "*created*".
>>
>> - There is *no* guarantee that any transactions in the topic
>> "transaction-*processed*" have a match (same transaction_id) in the
>> "transaction-*created*" at the moment the messages are fetched.
>>
>> So the workflow so far is:
>> - Msgs from the "transaction-created" just get synced to postgres, no
>> questions asked
>>
>> - As for the "transaction-processed", it goes as follows:
>>
>>- a) Messages are fetched from the Kafka topic
>>- b) Select the transaction_id of those...
>>- c) Fetch all the rows w/ the corresponding id from a Postgres table
>>AND that have the status "CREATED"
>>- d) Then, a pretty much do a intersection between the two datasets,
>>and sink only on "processed" ones that have with step c
>>- e) Persist the resulting dataset
>>
>> But the rows (from the 'processed') that were not part of the
>> intersection get lost afterwards...
>>
>> So my question is:
>> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the
>> app?
>> - For this scenario, should I fall back to Spark Streaming, instead of
>> Structured Streaming?
>>
>> PS: I was playing around with Spark Streaming (legacy) and managed to
>> commit only the ones in the microbatches that were fully successful (still
>> failed to find a way to "poll" for the uncommitted ones without restarting,
>> though).
>>
>> Thank you very much in advance!
>>
>>


Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Mich Talebzadeh
Can you please clarify if you are reading these two topics separately or
within the same scala or python script in Spark Structured Streaming?

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 9 Jul 2021 at 13:44, Bruno Oliveira  wrote:

> Hello guys,
>
> I've been struggling with this for some days now, without success, so I
> would highly appreciate any enlightenment. The simplified scenario is the
> following:
>
>- I've got 2 topics in Kafka (it's already like that in production,
>can't change it)
>   - transactions-created,
>   - transaction-processed
>- Even though the schema is not exactly the same, they all share a
>correlation_id, which is their "transaction_id"
>
> So, long story short, I've got 2 consumers, one for each topic, and all I
> wanna do is sink them in a chain order. I'm writing them w/ Spark
> Structured Streaming, btw
>
> So far so good, the caveat here is:
>
> - I cannot write a given "*processed" *transaction unless there is an
> entry of that same transaction with the status "*created*".
>
> - There is *no* guarantee that any transactions in the topic "transaction-
> *processed*" have a match (same transaction_id) in the "transaction-
> *created*" at the moment the messages are fetched.
>
> So the workflow so far is:
> - Msgs from the "transaction-created" just get synced to postgres, no
> questions asked
>
> - As for the "transaction-processed", it goes as follows:
>
>- a) Messages are fetched from the Kafka topic
>- b) Select the transaction_id of those...
>- c) Fetch all the rows w/ the corresponding id from a Postgres table
>AND that have the status "CREATED"
>- d) Then, a pretty much do a intersection between the two datasets,
>and sink only on "processed" ones that have with step c
>- e) Persist the resulting dataset
>
> But the rows (from the 'processed') that were not part of the intersection
> get lost afterwards...
>
> So my question is:
> - Is there ANY way to reprocess/replay them at all WITHOUT restarting the
> app?
> - For this scenario, should I fall back to Spark Streaming, instead of
> Structured Streaming?
>
> PS: I was playing around with Spark Streaming (legacy) and managed to
> commit only the ones in the microbatches that were fully successful (still
> failed to find a way to "poll" for the uncommitted ones without restarting,
> though).
>
> Thank you very much in advance!
>
>


Re: Spark Structured Streaming

2021-05-31 Thread S
Hi Mich,

I agree with you; spark streaming will become defunct in favor of
Structured Streaming. And I have gone over the document in detail. I am
aware of the unbounded datasets and running aggregate etc..

Nevertheless, I wouldn't say it's a moot point as it provides a good
intuition of the evolution of spark streaming model into the Structured
Streaming Model. The microbatch model is a commonality between the dstream
model and the structuted streaming model but it is a bit unsettling to
think that the structured Streaming model would use the same old receiver
based approach to achieve fixed interval microbatch. It does however fit
the direct based approach. Just wanted to get this conclusion that I
arrived at verified by the broader contributing community.

Regards,
Sheel

On Tue, 1 Jun, 2021, 1:57 AM Mich Talebzadeh, 
wrote:

> Hi,
>
> I guess whether structured streaming (SS) inherited anything from spark
> streaming is a moot point now, although it is a concept built on spark
> streaming which will be defunct soon.
>
> Going forward, It all depends on what problem you are trying to address.
>
> These are explained in the following doc
> 
>
> However, within SSS micro-batching, you have the concept of working out
> running aggregates within a given timeframe  akin to spark streaming with
> sliding window and window's length.
>
> The other one relies on a fixed triggering mechanism to invoke a function
> to perform some specific tasks (processing CDC, writing the result set to a
> database, working out average prices per security etc) on streaming data in
> that triggering period.
>
> To see a discussion on running aggregates please look for the following
> thread in this forum
>
> "Calculate average from Spark stream"
>
> And for triggering mechanism you can see an example in my linkedin  below
>
>
> https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/
>
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 31 May 2021 at 19:32, S  wrote:
>
>> Hi,
>>
>> I am using Structured Streaming on Azure HdInsight. The version is 2.4.6.
>>
>> I am trying to understand the microbatch mode - default and fixed
>> intervals. Does the fixed interval microbatch follow something similar to
>> receiver based model where records keep getting pulled and stored into
>> blocks for the duration of the interval at the end of which a job is kicked
>> off? Or, does the job just process the current microbatch and sleep for the
>> rest of the interval and pulls records only at the end of the interval?
>>
>> I am fully aware of the two dstreams models - receiver and direct based
>> dstreams. I am just trying to figure out if either of these two models were
>> reused in Structured Streaming.
>>
>> Regards,
>> Sheel
>>
>


Re: Spark Structured Streaming

2021-05-31 Thread Mich Talebzadeh
Hi,

I guess whether structured streaming (SS) inherited anything from spark
streaming is a moot point now, although it is a concept built on spark
streaming which will be defunct soon.

Going forward, It all depends on what problem you are trying to address.

These are explained in the following doc


However, within SSS micro-batching, you have the concept of working out
running aggregates within a given timeframe  akin to spark streaming with
sliding window and window's length.

The other one relies on a fixed triggering mechanism to invoke a function
to perform some specific tasks (processing CDC, writing the result set to a
database, working out average prices per security etc) on streaming data in
that triggering period.

To see a discussion on running aggregates please look for the following
thread in this forum

"Calculate average from Spark stream"

And for triggering mechanism you can see an example in my linkedin  below

https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/


HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 31 May 2021 at 19:32, S  wrote:

> Hi,
>
> I am using Structured Streaming on Azure HdInsight. The version is 2.4.6.
>
> I am trying to understand the microbatch mode - default and fixed
> intervals. Does the fixed interval microbatch follow something similar to
> receiver based model where records keep getting pulled and stored into
> blocks for the duration of the interval at the end of which a job is kicked
> off? Or, does the job just process the current microbatch and sleep for the
> rest of the interval and pulls records only at the end of the interval?
>
> I am fully aware of the two dstreams models - receiver and direct based
> dstreams. I am just trying to figure out if either of these two models were
> reused in Structured Streaming.
>
> Regards,
> Sheel
>


Re: Spark Structured Streaming 'bool' object is not callable, quitting

2021-04-21 Thread Mich Talebzadeh
yes indeed Russell, a silly mistake by me.

Thanks



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 21 Apr 2021 at 23:42, Russell Spitzer 
wrote:

> Callable means you tried to treat a field as a function like in the
> following example
>
> >>> fun = True
> >>> fun()
> Traceback (most recent call last):
>   File "", line 1, in 
> TypeError: 'bool' object is not callable
>
> My guess is that "isStreaming" is a bool, and in your syntax you used it
> as a function "isStreaming()"
>
> On Wed, Apr 21, 2021 at 5:35 PM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> I am testing something in Spark Structured Streaming, this is a new topic
>>
>>  Typical kafka json row
>>
>>   dummy2 = StructType().add("uuid", StringType()).add("timecreated",
>> TimestampType()).add("status", StringType())
>>
>> # example
>> ##856095c6-cdec-485b-9da6-d78275bc0a25
>> {"uuid":"856095c6-cdec-485b-9da6-d78275bc0a25",
>> "timecreated":"2021-04-21T23:29:16", "status":true"}
>>
>>   Created the Streaming read
>>
>>
>>  streamingDummy = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['dummy']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", config['MDVariables']['topicdummy'])
>> \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "latest") \
>> .load() \
>> .select(from_json(col("value").cast("string"),
>> dummy2).alias("dummy_value"))
>>
>> streamingDummy.printSchema()
>>   *print(streamingDummy.isStreaming())*
>>
>> When I run it I get
>>
>>
>> root
>>
>>  |-- dummy_value: struct (nullable = true)
>>
>>  ||-- uuid: string (nullable = true)
>>
>>  ||-- timecreated: timestamp (nullable = true)
>>
>>  ||-- status: string (nullable = true)
>>
>>
>> *'bool' object is not callable, quitting*
>>
>> I cannot figure out the last line!
>>
>>
>> Thanks
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Spark Structured Streaming 'bool' object is not callable, quitting

2021-04-21 Thread Mich Talebzadeh
Apologies it was a mistake by me.

The correct syntax is without additional ()

 *print(streamingDummy.isStreaming)*

It returns


True

Thanks


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 21 Apr 2021 at 23:34, Mich Talebzadeh 
wrote:

> Hi,
>
> I am testing something in Spark Structured Streaming, this is a new topic
>
>  Typical kafka json row
>
>   dummy2 = StructType().add("uuid", StringType()).add("timecreated",
> TimestampType()).add("status", StringType())
>
> # example
> ##856095c6-cdec-485b-9da6-d78275bc0a25
> {"uuid":"856095c6-cdec-485b-9da6-d78275bc0a25",
> "timecreated":"2021-04-21T23:29:16", "status":true"}
>
>   Created the Streaming read
>
>
>  streamingDummy = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['dummy']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topicdummy']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> dummy2).alias("dummy_value"))
>
> streamingDummy.printSchema()
>   *print(streamingDummy.isStreaming())*
>
> When I run it I get
>
>
> root
>
>  |-- dummy_value: struct (nullable = true)
>
>  ||-- uuid: string (nullable = true)
>
>  ||-- timecreated: timestamp (nullable = true)
>
>  ||-- status: string (nullable = true)
>
>
> *'bool' object is not callable, quitting*
>
> I cannot figure out the last line!
>
>
> Thanks
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark Structured Streaming 'bool' object is not callable, quitting

2021-04-21 Thread Russell Spitzer
Callable means you tried to treat a field as a function like in the
following example

>>> fun = True
>>> fun()
Traceback (most recent call last):
  File "", line 1, in 
TypeError: 'bool' object is not callable

My guess is that "isStreaming" is a bool, and in your syntax you used it as
a function "isStreaming()"

On Wed, Apr 21, 2021 at 5:35 PM Mich Talebzadeh 
wrote:

> Hi,
>
> I am testing something in Spark Structured Streaming, this is a new topic
>
>  Typical kafka json row
>
>   dummy2 = StructType().add("uuid", StringType()).add("timecreated",
> TimestampType()).add("status", StringType())
>
> # example
> ##856095c6-cdec-485b-9da6-d78275bc0a25
> {"uuid":"856095c6-cdec-485b-9da6-d78275bc0a25",
> "timecreated":"2021-04-21T23:29:16", "status":true"}
>
>   Created the Streaming read
>
>
>  streamingDummy = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['dummy']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topicdummy']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> dummy2).alias("dummy_value"))
>
> streamingDummy.printSchema()
>   *print(streamingDummy.isStreaming())*
>
> When I run it I get
>
>
> root
>
>  |-- dummy_value: struct (nullable = true)
>
>  ||-- uuid: string (nullable = true)
>
>  ||-- timecreated: timestamp (nullable = true)
>
>  ||-- status: string (nullable = true)
>
>
> *'bool' object is not callable, quitting*
>
> I cannot figure out the last line!
>
>
> Thanks
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark Structured Streaming with PySpark throwing error in execution

2021-04-06 Thread Mich Talebzadeh
Hi all,

Following the upgrade to 3.1.1, I see a couple of issues.

Spark Structured Streaming (SSS) does not seem to work with the newer
spark-sql-kafka-0-10_2.12-3.1.1.jar for Spark. It throws


java.lang.NoSuchMethodError:
org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z

So I have to use the previous jar file spark-sql-kafka-0-10_2.12-3.0.1.jar

However, we can set aside that for now

The second point is that with the following jars under $SPARK_HOME/jars


   1. spark-sql-kafka-0-10_2.12-3.0.1.jar
   2. commons-pool2-2.9.0.jar
   3. kafka-clients-2.7.0.jar


The SSS job runs in local mode as a single JVM

In Yarn mode this fails with the following error


*java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer*

Even other executors running on the same node fail as well. with the above
error.

I have ensured that those jar files are available on all three nodes of the
cluster (on-prem)  but still no luck,

Any ideas appreciated.

Thanks



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 22 Feb 2021 at 22:55, Mich Talebzadeh 
wrote:

> Many thanks Muru. That was a great help!
>
> -
> ---+-+---+
> |key |value
>
> |headers|
>
> ++-+---+
> |b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY",
> "timeissued":"2021-02-20T19:10:18", "price":374.6} |null   |
> |d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL",
> "timeissued":"2021-02-20T19:10:22", "price":19.24} |null   |
> |1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW",
> "timeissued":"2021-02-20T19:10:25", "price":263.05} |null   |
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 22 Feb 2021 at 22:46, muru  wrote:
>
>> You should include commons-pool2-2.9.0.jar and remove
>> spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).
>>
>> On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Trying to make PySpark with PyCharm work with Structured Streaming
>>>
>>> spark-3.0.1-bin-hadoop3.2
>>> kafka_2.12-1.1.0
>>>
>>> Basic code
>>>
>>> from __future__ import print_function
>>> from src.config import config, hive_url
>>> import sys
>>> from sparkutils import sparkstuff as s
>>>
>>> class MDStreaming:
>>> def __init__(self, spark_session,spark_context):
>>> self.spark = spark_session
>>> self.sc = spark_context
>>> self.config = config
>>>
>>> def startStreaming(self):
>>> self.sc.setLogLevel("ERROR")
>>> try:
>>> kafkaReaderWithHeaders = self.spark \
>>> .readStream \
>>> .format("kafka") \
>>> .option("kafka.bootstrap.servers",
>>> config['MDVariables']['bootstrapServers'],) \
>>> .option("schema.registry.url",
>>> config['MDVariables']['schemaRegistryURL']) \
>>> .option("group.id", config['common']['appName']) \
>>> .option("zookeeper.connection.timeout.ms",
>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>> .option("rebalance.backoff.ms",
>>> config['MDVariables']['rebalanceBackoffMS']) \
>>> .option("zookeeper.session.timeout.ms",
>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>> .option("auto.commit.interval.ms",
>>> config['MDVariables']['autoCommitIntervalMS']) \
>>> .option("subscribe", config['MDVariables']['topic']) \
>>> .option("failOnDataLoss", "false") \
>>> .option("includeHeaders", "true") \
>>> .option("startingOffsets", "earliest") \
>>> .load()
>>> except Exception as e:
>>> print(f"""{e}, quitting""")
>>> sys.exit(1)
>>>
>>> kafkaReaderWithHeaders.sel

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Gabor Somogyi
Just to be crystal clear Dstreams will be deprecated sooner or later and
there will be no support so highly advised to migrate...

G


On Sun, 4 Apr 2021, 19:23 Ali Gouta,  wrote:

> Thanks Mich !
>
> Ali Gouta.
>
> On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh 
> wrote:
>
>> Hi Ali,
>>
>> The old saying of one experiment is worth a hundred hypotheses, still
>> stands.
>>
>> As per Test driven approach have a go at it and see what comes out. Forum
>> members including myself have reported on SSS in Spark user group, so you
>> are at home on this.
>>
>> HTH,
>>
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:
>>
>>> Great, so SSS provides also an api that allows handling RDDs through
>>> dataFrames using foreachBatch. Still that I am not sure this is a
>>> good practice in general right ? Well, it depends on the use case in any
>>> way.
>>>
>>> Thank you so much for the hints !
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi Ali,


 On a practical side, I have used both the old DStreams and the newer
 Spark structured streaming (SSS).


 SSS does a good job at micro-batch level in the form of


 foreachBatch(SendToSink)


  "foreach" performs custom write logic on each row and "foreachBatch" 
 *performs
 custom write logic *on each micro-batch through SendToSink function.
 foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
 DataFrame or Dataset and second: unique id for each batch. Using
 foreachBatch, we write each micro batch eventually to storage defined in
 our custom logic. In this case, we store the output of our streaming
 application to Redis or Google BigQuery table or any other sink



 In Dstream world you would have done something like below


 // Work on every Stream

 dstream.foreachRDD

 { pricesRDD =>

   if (!pricesRDD.isEmpty)  // data exists in RDD

   {

 and after some work from that RDD you would have created a DF (df)

 With regard to SSS, it allows you to use the passed DataFrame for your
 work. However, say in my case if you were interested in individual rows of
 micro-batch (say different collection of prices for different tickers
 (securities), you could create RDD from the dataframe

 for row in df.rdd.collect():
 ticker = row.ticker
 price = row.price


 With regard to foreach(process_row), I have not really tried it as we
 don't have a use case for it, so I assume your mileage varies as usual.


 HTH



view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:

> Thank you guys for your answers, I will dig more this new way of doing
> things and why not consider leaving the old Dstreams and use instead
> structured streaming. Hope that strucrured streaming + spark on Kubernetes
> works well and the combination is production ready.
>
> Best regards,
> Ali Gouta.
>
> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a
> écrit :
>
>> Hi,
>>
>> Just to add it to Gabor's excellent answer that checkpointing and
>> offsets are infrastructure-related and should not really be in the hands 
>> of
>> Spark devs who should instead focus on the business purpose of the code
>> (not offsets that are very low-level and not really important).
>>
>> BTW That's what happens in Kafka Streams too
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
>> gabor.g.somo...@gmail.com> wrote:
>>
>>> There is no way to store offsets in Kafka a

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Thanks Mich !

Ali Gouta.

On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh 
wrote:

> Hi Ali,
>
> The old saying of one experiment is worth a hundred hypotheses, still
> stands.
>
> As per Test driven approach have a go at it and see what comes out. Forum
> members including myself have reported on SSS in Spark user group, so you
> are at home on this.
>
> HTH,
>
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:
>
>> Great, so SSS provides also an api that allows handling RDDs through
>> dataFrames using foreachBatch. Still that I am not sure this is a
>> good practice in general right ? Well, it depends on the use case in any
>> way.
>>
>> Thank you so much for the hints !
>>
>> Best regards,
>> Ali Gouta.
>>
>> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi Ali,
>>>
>>>
>>> On a practical side, I have used both the old DStreams and the newer
>>> Spark structured streaming (SSS).
>>>
>>>
>>> SSS does a good job at micro-batch level in the form of
>>>
>>>
>>> foreachBatch(SendToSink)
>>>
>>>
>>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>>> *performs
>>> custom write logic *on each micro-batch through SendToSink function.
>>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>>> DataFrame or Dataset and second: unique id for each batch. Using
>>> foreachBatch, we write each micro batch eventually to storage defined in
>>> our custom logic. In this case, we store the output of our streaming
>>> application to Redis or Google BigQuery table or any other sink
>>>
>>>
>>>
>>> In Dstream world you would have done something like below
>>>
>>>
>>> // Work on every Stream
>>>
>>> dstream.foreachRDD
>>>
>>> { pricesRDD =>
>>>
>>>   if (!pricesRDD.isEmpty)  // data exists in RDD
>>>
>>>   {
>>>
>>> and after some work from that RDD you would have created a DF (df)
>>>
>>> With regard to SSS, it allows you to use the passed DataFrame for your
>>> work. However, say in my case if you were interested in individual rows of
>>> micro-batch (say different collection of prices for different tickers
>>> (securities), you could create RDD from the dataframe
>>>
>>> for row in df.rdd.collect():
>>> ticker = row.ticker
>>> price = row.price
>>>
>>>
>>> With regard to foreach(process_row), I have not really tried it as we
>>> don't have a use case for it, so I assume your mileage varies as usual.
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>>>
 Thank you guys for your answers, I will dig more this new way of doing
 things and why not consider leaving the old Dstreams and use instead
 structured streaming. Hope that strucrured streaming + spark on Kubernetes
 works well and the combination is production ready.

 Best regards,
 Ali Gouta.

 Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a
 écrit :

> Hi,
>
> Just to add it to Gabor's excellent answer that checkpointing and
> offsets are infrastructure-related and should not really be in the hands 
> of
> Spark devs who should instead focus on the business purpose of the code
> (not offsets that are very low-level and not really important).
>
> BTW That's what happens in Kafka Streams too
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
> gabor.g.somo...@gmail.com> wrote:
>
>> There is no way to store offsets in Kafka and restart from the stored
>> offset. Structured Streaming stores offset in checkpoint and it restart
>> from there without any user code.
>>
>> Offsets can be stored with a listener but it can be only used for lag
>> calculation.
>>
>> BR,
>> G
>>
>>
>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>
>>> Hello,
>

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Mich Talebzadeh
Hi Ali,

The old saying of one experiment is worth a hundred hypotheses, still
stands.

As per Test driven approach have a go at it and see what comes out. Forum
members including myself have reported on SSS in Spark user group, so you
are at home on this.

HTH,




   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Apr 2021 at 17:28, Ali Gouta  wrote:

> Great, so SSS provides also an api that allows handling RDDs through
> dataFrames using foreachBatch. Still that I am not sure this is a
> good practice in general right ? Well, it depends on the use case in any
> way.
>
> Thank you so much for the hints !
>
> Best regards,
> Ali Gouta.
>
> On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
> wrote:
>
>> Hi Ali,
>>
>>
>> On a practical side, I have used both the old DStreams and the newer
>> Spark structured streaming (SSS).
>>
>>
>> SSS does a good job at micro-batch level in the form of
>>
>>
>> foreachBatch(SendToSink)
>>
>>
>>  "foreach" performs custom write logic on each row and "foreachBatch" 
>> *performs
>> custom write logic *on each micro-batch through SendToSink function.
>> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
>> DataFrame or Dataset and second: unique id for each batch. Using
>> foreachBatch, we write each micro batch eventually to storage defined in
>> our custom logic. In this case, we store the output of our streaming
>> application to Redis or Google BigQuery table or any other sink
>>
>>
>>
>> In Dstream world you would have done something like below
>>
>>
>> // Work on every Stream
>>
>> dstream.foreachRDD
>>
>> { pricesRDD =>
>>
>>   if (!pricesRDD.isEmpty)  // data exists in RDD
>>
>>   {
>>
>> and after some work from that RDD you would have created a DF (df)
>>
>> With regard to SSS, it allows you to use the passed DataFrame for your
>> work. However, say in my case if you were interested in individual rows of
>> micro-batch (say different collection of prices for different tickers
>> (securities), you could create RDD from the dataframe
>>
>> for row in df.rdd.collect():
>> ticker = row.ticker
>> price = row.price
>>
>>
>> With regard to foreach(process_row), I have not really tried it as we
>> don't have a use case for it, so I assume your mileage varies as usual.
>>
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>>
>>> Thank you guys for your answers, I will dig more this new way of doing
>>> things and why not consider leaving the old Dstreams and use instead
>>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>>> works well and the combination is production ready.
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>>>
 Hi,

 Just to add it to Gabor's excellent answer that checkpointing and
 offsets are infrastructure-related and should not really be in the hands of
 Spark devs who should instead focus on the business purpose of the code
 (not offsets that are very low-level and not really important).

 BTW That's what happens in Kafka Streams too

 Pozdrawiam,
 Jacek Laskowski
 
 https://about.me/JacekLaskowski
 "The Internals Of" Online Books 
 Follow me on https://twitter.com/jaceklaskowski

 


 On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <
 gabor.g.somo...@gmail.com> wrote:

> There is no way to store offsets in Kafka and restart from the stored
> offset. Structured Streaming stores offset in checkpoint and it restart
> from there without any user code.
>
> Offsets can be stored with a listener but it can be only used for lag
> calculation.
>
> BR,
> G
>
>
> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>
>> Hello,
>>
>> I was reading the spark docs about spark structured streaming, since
>> we are thinking about updating our code base that today uses Dstreams,
>> hence spark streaming. Also, one main reason for this change that we want
>> to realize is that reading h

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Great, so SSS provides also an api that allows handling RDDs through
dataFrames using foreachBatch. Still that I am not sure this is a
good practice in general right ? Well, it depends on the use case in any
way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh 
wrote:

> Hi Ali,
>
>
> On a practical side, I have used both the old DStreams and the newer Spark
> structured streaming (SSS).
>
>
> SSS does a good job at micro-batch level in the form of
>
>
> foreachBatch(SendToSink)
>
>
>  "foreach" performs custom write logic on each row and "foreachBatch" 
> *performs
> custom write logic *on each micro-batch through SendToSink function.
> foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch. Using
> foreachBatch, we write each micro batch eventually to storage defined in
> our custom logic. In this case, we store the output of our streaming
> application to Redis or Google BigQuery table or any other sink
>
>
>
> In Dstream world you would have done something like below
>
>
> // Work on every Stream
>
> dstream.foreachRDD
>
> { pricesRDD =>
>
>   if (!pricesRDD.isEmpty)  // data exists in RDD
>
>   {
>
> and after some work from that RDD you would have created a DF (df)
>
> With regard to SSS, it allows you to use the passed DataFrame for your
> work. However, say in my case if you were interested in individual rows of
> micro-batch (say different collection of prices for different tickers
> (securities), you could create RDD from the dataframe
>
> for row in df.rdd.collect():
> ticker = row.ticker
> price = row.price
>
>
> With regard to foreach(process_row), I have not really tried it as we
> don't have a use case for it, so I assume your mileage varies as usual.
>
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:
>
>> Thank you guys for your answers, I will dig more this new way of doing
>> things and why not consider leaving the old Dstreams and use instead
>> structured streaming. Hope that strucrured streaming + spark on Kubernetes
>> works well and the combination is production ready.
>>
>> Best regards,
>> Ali Gouta.
>>
>> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>>
>>> Hi,
>>>
>>> Just to add it to Gabor's excellent answer that checkpointing and
>>> offsets are infrastructure-related and should not really be in the hands of
>>> Spark devs who should instead focus on the business purpose of the code
>>> (not offsets that are very low-level and not really important).
>>>
>>> BTW That's what happens in Kafka Streams too
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books 
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> 
>>>
>>>
>>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
>>> wrote:
>>>
 There is no way to store offsets in Kafka and restart from the stored
 offset. Structured Streaming stores offset in checkpoint and it restart
 from there without any user code.

 Offsets can be stored with a listener but it can be only used for lag
 calculation.

 BR,
 G


 On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:

> Hello,
>
> I was reading the spark docs about spark structured streaming, since
> we are thinking about updating our code base that today uses Dstreams,
> hence spark streaming. Also, one main reason for this change that we want
> to realize is that reading headers in kafka messages is only supported in
> spark structured streaming and not in Dstreams.
>
> I was surprised to not see an obvious way to handle manually the
> offsets by committing the offsets to kafka. In spark streaming we used to
> do it with something similar to these lines of code:
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> And this works perfectly ! Especially, this works very nice in case of
> job failure/restart... I am wondering how this can be achieved in spark
> structured streaming ?
>
> I read about checkpoints, and this reminds me the old way of doing
> things in spark 1.5/kafka0.8 and is not perfect since we are not de

Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Mich Talebzadeh
Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark
structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" *performs
custom write logic *on each micro-batch through SendToSink function.
foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as
DataFrame or Dataset and second: unique id for each batch. Using
foreachBatch, we write each micro batch eventually to storage defined in
our custom logic. In this case, we store the output of our streaming
application to Redis or Google BigQuery table or any other sink



In Dstream world you would have done something like below


// Work on every Stream

dstream.foreachRDD

{ pricesRDD =>

  if (!pricesRDD.isEmpty)  // data exists in RDD

  {

and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your
work. However, say in my case if you were interested in individual rows of
micro-batch (say different collection of prices for different tickers
(securities), you could create RDD from the dataframe

for row in df.rdd.collect():
ticker = row.ticker
price = row.price


With regard to foreach(process_row), I have not really tried it as we don't
have a use case for it, so I assume your mileage varies as usual.


HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 4 Apr 2021 at 16:27, Ali Gouta  wrote:

> Thank you guys for your answers, I will dig more this new way of doing
> things and why not consider leaving the old Dstreams and use instead
> structured streaming. Hope that strucrured streaming + spark on Kubernetes
> works well and the combination is production ready.
>
> Best regards,
> Ali Gouta.
>
> Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :
>
>> Hi,
>>
>> Just to add it to Gabor's excellent answer that checkpointing and offsets
>> are infrastructure-related and should not really be in the hands of Spark
>> devs who should instead focus on the business purpose of the code (not
>> offsets that are very low-level and not really important).
>>
>> BTW That's what happens in Kafka Streams too
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
>> wrote:
>>
>>> There is no way to store offsets in Kafka and restart from the stored
>>> offset. Structured Streaming stores offset in checkpoint and it restart
>>> from there without any user code.
>>>
>>> Offsets can be stored with a listener but it can be only used for lag
>>> calculation.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>>
 Hello,

 I was reading the spark docs about spark structured streaming, since we
 are thinking about updating our code base that today uses Dstreams, hence
 spark streaming. Also, one main reason for this change that we want to
 realize is that reading headers in kafka messages is only supported in
 spark structured streaming and not in Dstreams.

 I was surprised to not see an obvious way to handle manually the
 offsets by committing the offsets to kafka. In spark streaming we used to
 do it with something similar to these lines of code:

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

   // some time later, after outputs have completed
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}


 And this works perfectly ! Especially, this works very nice in case of
 job failure/restart... I am wondering how this can be achieved in spark
 structured streaming ?

 I read about checkpoints, and this reminds me the old way of doing
 things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
 when to commit offsets by ourselves.

 Did I miss anything ? What would be the best way of committing offsets
 to kafka with spark structured streaming to the concerned consumer group ?

 Best regards,
 Ali Gouta.

>>>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Ali Gouta
Thank you guys for your answers, I will dig more this new way of doing
things and why not consider leaving the old Dstreams and use instead
structured streaming. Hope that strucrured streaming + spark on Kubernetes
works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski  a écrit :

> Hi,
>
> Just to add it to Gabor's excellent answer that checkpointing and offsets
> are infrastructure-related and should not really be in the hands of Spark
> devs who should instead focus on the business purpose of the code (not
> offsets that are very low-level and not really important).
>
> BTW That's what happens in Kafka Streams too
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
> wrote:
>
>> There is no way to store offsets in Kafka and restart from the stored
>> offset. Structured Streaming stores offset in checkpoint and it restart
>> from there without any user code.
>>
>> Offsets can be stored with a listener but it can be only used for lag
>> calculation.
>>
>> BR,
>> G
>>
>>
>> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>>
>>> Hello,
>>>
>>> I was reading the spark docs about spark structured streaming, since we
>>> are thinking about updating our code base that today uses Dstreams, hence
>>> spark streaming. Also, one main reason for this change that we want to
>>> realize is that reading headers in kafka messages is only supported in
>>> spark structured streaming and not in Dstreams.
>>>
>>> I was surprised to not see an obvious way to handle manually the offsets
>>> by committing the offsets to kafka. In spark streaming we used to do it
>>> with something similar to these lines of code:
>>>
>>> stream.foreachRDD { rdd =>
>>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>
>>>   // some time later, after outputs have completed
>>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>>
>>>
>>> And this works perfectly ! Especially, this works very nice in case of
>>> job failure/restart... I am wondering how this can be achieved in spark
>>> structured streaming ?
>>>
>>> I read about checkpoints, and this reminds me the old way of doing
>>> things in spark 1.5/kafka0.8 and is not perfect since we are not deciding
>>> when to commit offsets by ourselves.
>>>
>>> Did I miss anything ? What would be the best way of committing offsets
>>> to kafka with spark structured streaming to the concerned consumer group ?
>>>
>>> Best regards,
>>> Ali Gouta.
>>>
>>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Jacek Laskowski
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets
are infrastructure-related and should not really be in the hands of Spark
devs who should instead focus on the business purpose of the code (not
offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi 
wrote:

> There is no way to store offsets in Kafka and restart from the stored
> offset. Structured Streaming stores offset in checkpoint and it restart
> from there without any user code.
>
> Offsets can be stored with a listener but it can be only used for lag
> calculation.
>
> BR,
> G
>
>
> On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:
>
>> Hello,
>>
>> I was reading the spark docs about spark structured streaming, since we
>> are thinking about updating our code base that today uses Dstreams, hence
>> spark streaming. Also, one main reason for this change that we want to
>> realize is that reading headers in kafka messages is only supported in
>> spark structured streaming and not in Dstreams.
>>
>> I was surprised to not see an obvious way to handle manually the offsets
>> by committing the offsets to kafka. In spark streaming we used to do it
>> with something similar to these lines of code:
>>
>> stream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>   // some time later, after outputs have completed
>>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>>
>>
>> And this works perfectly ! Especially, this works very nice in case of
>> job failure/restart... I am wondering how this can be achieved in spark
>> structured streaming ?
>>
>> I read about checkpoints, and this reminds me the old way of doing things
>> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
>> commit offsets by ourselves.
>>
>> Did I miss anything ? What would be the best way of committing offsets to
>> kafka with spark structured streaming to the concerned consumer group ?
>>
>> Best regards,
>> Ali Gouta.
>>
>


Re: Spark structured streaming + offset management in kafka + kafka headers

2021-04-04 Thread Gabor Somogyi
There is no way to store offsets in Kafka and restart from the stored
offset. Structured Streaming stores offset in checkpoint and it restart
from there without any user code.

Offsets can be stored with a listener but it can be only used for lag
calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta,  wrote:

> Hello,
>
> I was reading the spark docs about spark structured streaming, since we
> are thinking about updating our code base that today uses Dstreams, hence
> spark streaming. Also, one main reason for this change that we want to
> realize is that reading headers in kafka messages is only supported in
> spark structured streaming and not in Dstreams.
>
> I was surprised to not see an obvious way to handle manually the offsets
> by committing the offsets to kafka. In spark streaming we used to do it
> with something similar to these lines of code:
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> And this works perfectly ! Especially, this works very nice in case of job
> failure/restart... I am wondering how this can be achieved in spark
> structured streaming ?
>
> I read about checkpoints, and this reminds me the old way of doing things
> in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to
> commit offsets by ourselves.
>
> Did I miss anything ? What would be the best way of committing offsets to
> kafka with spark structured streaming to the concerned consumer group ?
>
> Best regards,
> Ali Gouta.
>


Re: Spark Structured Streaming Continuous Trigger Mode React to an External Trigger

2021-03-29 Thread shahrajesh2006
I tried to create a Dataset by loading a file and pass that as argument to
java method as below:

Dataset propertiesFile// Dataset created by loading a json property
file
Dataset streamingQuery // Dataset for streaming query

streamingQuery.map(
row -> myfunction( row, propertiesFile), Encoders.STRING());

This approach throws NulPointerException while reading from propertiesFile
Dataset.
Looks like I can not pass propertiesFile Dataset in this way in Spark, I
have to join with streamingQuery Dataset.






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming Continuous Trigger Mode React to an External Trigger

2021-03-27 Thread shahrajesh2006
Sorry I don't have a diagram to share.  your understanding of how I are using
spark application is right.
 
Its  kafka topic with 6 partitions, so spark is able to create 6 parallel
consumers/executors.

Thought of using Airflow is interesting. I will explore this option more.

Other idea of using ProcessingTime trigger(every 60 seconds) to build a new
query to load data from s3 file and use results from this query with
ContinuousTrigger query - I will try this option also.

Thanks again!

 







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming Continuous Trigger Mode React to an External Trigger

2021-03-27 Thread Mich Talebzadeh
Hi,

Would help if you could include a high level architecture diagram.

So as I understand you are running a single broker with 6 partitions (or 6
brokers with one partition (default each).

You said your are using continuous triggering mode, meaning as an example


   foreach(ForeachWriter()).

 trigger(Trigger.Continuous("1 second").



with 1 sec checkpointing interval.


so the class ForeachWriter() will handle the transformation logic that will
apply to all executors.


For looking for changes to s3 files, you need an orchestrator integrated
with Spark. *So this is all event driven*. Something like airflow with a
file sensor. I am not sure what the granularity of your resolution is but
you may be able to use micro-batching as well.


 foreachBatch(SendToBigQuery). \

 trigger(processingTime='2 seconds'). \

Whatever happens, that class within spark should be able to poll for
changes and take the correct logic where necessary.

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 27 Mar 2021 at 12:46, shahrajesh2006 
wrote:

> I am using Spark(Java) Structured Streaming in Continuous Trigger Mode
> connecting to Kafka Broker. Usecase is very simple to do some custom
> filter/transformation using a simple java method and ingest data into an
> external system. Kafka has 6 partitions -so application is running 6
> executors.  I have requirement to change the behavior of
> filter/transfomration logic each executor is doing based on external
> event(for example a property change in s3 file).  This is  towards the goal
> of building a highly resiliency architecture where Spark application is
> running into two Cloud Regions and react to an external event.
>
> What is best way to send a signal to running spark application and
> prorogate
> same to each executor?
>
> Approach I have in mind is to create a new component which periodically
> refreshes s3 file to look for trigger event. This component will be
> integrated with logic  running on each Spark executor JVM.
>
> Please advice.
>
> Thanks and Regards
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Structured Streaming and Kafka message schema evolution

2021-03-17 Thread Mich Talebzadeh
Thanks Jungtaek.

I have reasons for this. So I will bring it up in another thread

Cheers,



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 15 Mar 2021 at 21:38, Jungtaek Lim 
wrote:

> If I understand correctly, SQL semantics are strict on column schema.
> Reading via Kafka data source doesn't require you to specify the schema as
> it provides the key and value as binary, but once you deserialize them,
> unless you keep the type as primitive (e.g. String), you'll need to specify
> the schema, like from_json requires you to.
>
> This wouldn't be changed even if you leverage Schema Registry - you'll
> need to provide the schema which is compatible with all schemas which
> records are associated with. I guess that's guaranteed if you use the
> latest version of the schema and you've changed the schema as
> "backward-compatible ways". I admit I haven't dealt with SR in SSS, but if
> you integrate the schema to the query plan, running query is unlikely
> getting the latest schema, but it still wouldn't matter as your query
> should only leverage the part of schema you've integrated, and the latest
> schema is "backward compatible" with the integrated schema.
>
> Hope this helps.
>
> Thanks
> Jungtaek Lim (HeartSaVioR)
>
> On Mon, Mar 15, 2021 at 9:25 PM Mich Talebzadeh 
> wrote:
>
>> This is just a query.
>>
>> In general Kafka-connect requires means to register that schema such that
>> producers and consumers understand that. It also allows schema evolution,
>> i.e. changes to metadata that identifies the structure of data sent via
>> topic.
>>
>> When we stream a kafka topic into (Spark Structured Streaming (SSS), the
>> assumption is that by the time Spark processes that data, its structure
>> can be established. With foreachBatch, we create a dataframe on top of
>> incoming batches of Json messages and the dataframe can be interrogated.
>> However, the processing may fail if another column is added to the topic
>> and the consumer (in this case SSS) is not aware of it. How can this change
>> of schema be verified?
>>
>> Thanks
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Re: Spark Structured Streaming and Kafka message schema evolution

2021-03-15 Thread Jungtaek Lim
If I understand correctly, SQL semantics are strict on column schema.
Reading via Kafka data source doesn't require you to specify the schema as
it provides the key and value as binary, but once you deserialize them,
unless you keep the type as primitive (e.g. String), you'll need to specify
the schema, like from_json requires you to.

This wouldn't be changed even if you leverage Schema Registry - you'll need
to provide the schema which is compatible with all schemas which records
are associated with. I guess that's guaranteed if you use the latest
version of the schema and you've changed the schema as "backward-compatible
ways". I admit I haven't dealt with SR in SSS, but if you integrate the
schema to the query plan, running query is unlikely getting the latest
schema, but it still wouldn't matter as your query should only leverage the
part of schema you've integrated, and the latest schema is "backward
compatible" with the integrated schema.

Hope this helps.

Thanks
Jungtaek Lim (HeartSaVioR)

On Mon, Mar 15, 2021 at 9:25 PM Mich Talebzadeh 
wrote:

> This is just a query.
>
> In general Kafka-connect requires means to register that schema such that
> producers and consumers understand that. It also allows schema evolution,
> i.e. changes to metadata that identifies the structure of data sent via
> topic.
>
> When we stream a kafka topic into (Spark Structured Streaming (SSS), the
> assumption is that by the time Spark processes that data, its structure
> can be established. With foreachBatch, we create a dataframe on top of
> incoming batches of Json messages and the dataframe can be interrogated.
> However, the processing may fail if another column is added to the topic
> and the consumer (in this case SSS) is not aware of it. How can this change
> of schema be verified?
>
> Thanks
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Spark Structured Streaming from GCS files

2021-03-15 Thread Gowrishankar Sunder
Our online services running in GCP collect data from our clients and write
it to GCS under time-partitioned folders like /mm/dd/hh/mm
(current-time) or similar ones. We need these files to be processed in
real-time from Spark. As for the runtime, we plan to run it either on
Dataproc or K8s.

- Gowrishankar Sunder


On Mon, Mar 15, 2021 at 12:13 PM Mich Talebzadeh 
wrote:

>
> Hi,
>
> I looked at the stackoverflow reference.
>
> The first question that comes to my mind is how you are populating these
> gcs buckets? Are you shifting data from on-prem and landing them in the
> buckets and  creating a new folder at the given interval?
>
> Where will you be running your Spark Structured Streaming? On dataproics?
>
> HTH
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 15 Mar 2021 at 19:00, Gowrishankar Sunder 
> wrote:
>
>> Hi,
>>We have a use case to stream files from GCS time-partitioned folders
>> and perform structured streaming queries on top of them. I have detailed
>> the use cases and requirements in this Stackoverflow question
>> 
>>  but
>> at a high level, the problems I am facing are listed below and would like
>> guidance on the best approach to use
>>
>>- Custom source APIs for Structured Streaming are undergoing major
>>changes (including the new Table API support) and the documentation does
>>not capture much details when it comes to building custom sources. I was
>>wondering if the current APIs are expected to remain stable through the
>>targeted 3.2 release and if there are examples on how to use them for my
>>use case.
>>- The default FileStream
>>
>> 
>>source looks up a static glob path which might not scale when the job runs
>>for days with multiple time partitions. But it has some really useful
>>features handling files - supports all major source formats (AVRO, 
>> Parquet,
>>JSON etc...), takes care of compression and partitioning large files into
>>sub-tasks - all of which I need to implement again for the current custom
>>source APIs as they stand. I was wondering if I can still somehow make use
>>of them while solving the scaling time partitioning file globbing issue.
>>
>> Thanks
>>
>>
>>


Re: Spark Structured Streaming from GCS files

2021-03-15 Thread Mich Talebzadeh
Hi,

I looked at the stackoverflow reference.

The first question that comes to my mind is how you are populating these
gcs buckets? Are you shifting data from on-prem and landing them in the
buckets and  creating a new folder at the given interval?

Where will you be running your Spark Structured Streaming? On dataproics?

HTH


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 15 Mar 2021 at 19:00, Gowrishankar Sunder 
wrote:

> Hi,
>We have a use case to stream files from GCS time-partitioned folders
> and perform structured streaming queries on top of them. I have detailed
> the use cases and requirements in this Stackoverflow question
> 
>  but
> at a high level, the problems I am facing are listed below and would like
> guidance on the best approach to use
>
>- Custom source APIs for Structured Streaming are undergoing major
>changes (including the new Table API support) and the documentation does
>not capture much details when it comes to building custom sources. I was
>wondering if the current APIs are expected to remain stable through the
>targeted 3.2 release and if there are examples on how to use them for my
>use case.
>- The default FileStream
>
> 
>source looks up a static glob path which might not scale when the job runs
>for days with multiple time partitions. But it has some really useful
>features handling files - supports all major source formats (AVRO, Parquet,
>JSON etc...), takes care of compression and partitioning large files into
>sub-tasks - all of which I need to implement again for the current custom
>source APIs as they stand. I was wondering if I can still somehow make use
>of them while solving the scaling time partitioning file globbing issue.
>
> Thanks
>
>
>


Re: Spark structured streaming Stuck on Batch = 0 on spark 3.1.1, Dataproc cluster

2021-03-01 Thread Mich Talebzadeh
BY the way as per streaming doc
,
one can monitor streaming status with

 result = streamingDataFrame.select( \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 format('console'). \
 start()

print(result.status)
print(result.recentProgress)
print(result.lastProgress)

Ok so they should tell us something.

When I run it where streaming data is displayed (on-premise) I see below
(format('console')

{'message': 'Initializing sources', 'isDataAvailable': False,
'isTriggerActive': False}
[]
None
---
Batch: 0
---
+--+--+--+-++---+---+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+--+--+--+-++---+---+
+--+--+--+-++---+---+

---
Batch: 1
---
++--+---+--++---+--+
|rowkey  |ticker|timeissued |price
|currency|op_type|op_time   |
++--+---+--++---+--+
|e4c02434-fa1f-4e8e-ad94-40c2782e9681|MRW   |2021-03-01
15:11:44|293.75|GBP |1  |2021-03-01 15:12:16.49|

etc ..

On the other hand when I run it in Google Cloud cluster I see exactly the
same diagnostics BUT no data!

{'message': 'Initializing sources', 'isDataAvailable': False,
'isTriggerActive': False}
[]
None
---
Batch: 0
---
+--+--+--+-++---+---+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+--+--+--+-++---+---+
+--+--+--+-++---+---+

So the monitoring does not say anything.

What does the following signify?

print(result.status)


Thanks



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 27 Feb 2021 at 18:26, Mich Talebzadeh 
wrote:

> Hi,
>
> I have a Pyspark program that uses *Spark 3.0.1* to read Kafka topic and
> write it to Google BigQuery. This works fine on Premise and loops over
> micro-batch of data.
>
>  def fetch_data(self):
> self.sc.setLogLevel("ERROR")
> #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
> "timeissued":"2021-02-23T08:42:23", "price":31.12}
> schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
> try:
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> #streamingDataFrame.printSchema()
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first:
> micro-batch as DataFrame or Dataset and second: unique id for each batch
>Using foreachBatch, we write each micro batch to storage
> define

Re: Spark Structured Streaming with PySpark throwing error in execution

2021-02-22 Thread Mich Talebzadeh
Many thanks Muru. That was a great help!

-
---+-+---+
|key |value

|headers|
++-+---+
|b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY",
"timeissued":"2021-02-20T19:10:18", "price":374.6} |null   |
|d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL",
"timeissued":"2021-02-20T19:10:22", "price":19.24} |null   |
|1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW",
"timeissued":"2021-02-20T19:10:25", "price":263.05} |null   |




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 22 Feb 2021 at 22:46, muru  wrote:

> You should include commons-pool2-2.9.0.jar and remove
> spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).
>
> On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> Trying to make PySpark with PyCharm work with Structured Streaming
>>
>> spark-3.0.1-bin-hadoop3.2
>> kafka_2.12-1.1.0
>>
>> Basic code
>>
>> from __future__ import print_function
>> from src.config import config, hive_url
>> import sys
>> from sparkutils import sparkstuff as s
>>
>> class MDStreaming:
>> def __init__(self, spark_session,spark_context):
>> self.spark = spark_session
>> self.sc = spark_context
>> self.config = config
>>
>> def startStreaming(self):
>> self.sc.setLogLevel("ERROR")
>> try:
>> kafkaReaderWithHeaders = self.spark \
>> .readStream \
>> .format("kafka") \
>> .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>> .option("schema.registry.url",
>> config['MDVariables']['schemaRegistryURL']) \
>> .option("group.id", config['common']['appName']) \
>> .option("zookeeper.connection.timeout.ms",
>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>> .option("rebalance.backoff.ms",
>> config['MDVariables']['rebalanceBackoffMS']) \
>> .option("zookeeper.session.timeout.ms",
>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>> .option("auto.commit.interval.ms",
>> config['MDVariables']['autoCommitIntervalMS']) \
>> .option("subscribe", config['MDVariables']['topic']) \
>> .option("failOnDataLoss", "false") \
>> .option("includeHeaders", "true") \
>> .option("startingOffsets", "earliest") \
>> .load()
>> except Exception as e:
>> print(f"""{e}, quitting""")
>> sys.exit(1)
>>
>> kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)",
>> "CAST(value AS STRING)", "headers") \
>> .writeStream \
>> .format("console") \
>> .option("truncate","false") \
>> .start() \
>> .awaitTermination()
>> kafkaReaderWithHeaders.printSchema()
>>
>> if __name__ == "__main__":
>> appName = config['common']['appName']
>> spark_session = s.spark_session(appName)
>> spark_context = s.sparkcontext()
>> mdstreaming = MDStreaming(spark_session, spark_context)
>> mdstreaming.startStreaming()
>>
>> I have used the following jars in $SYBASE_HOME/jars
>>
>>   spark-sql-kafka-0-10_2.12-3.0.1.jar
>>  kafka-clients-2.7.0.jar
>>  spark-streaming-kafka-0-10_2.12-3.0.1.jar
>>  spark-token-provider-kafka-0-10_2.12-3.0.1.jar
>>
>> and also in $SPARK_HOME/conf/spark-defaults.conf
>>
>> spark.driver.extraClassPath$SPARK_HOME/jars/*.jar
>> spark.executor.extraClassPath  $SPARK_HOME/jars/*.jar
>>
>>
>> The error is this:
>>
>> 2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in
>> stage 0.0 (TID 3)
>> *java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$*
>> at
>> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.(KafkaBatchPartitionReader.scala:52)
>> at
>> org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
>> at
>> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>> at org.apache.spark.rdd.RDD.iterator(

Re: Spark Structured Streaming with PySpark throwing error in execution

2021-02-22 Thread muru
You should include commons-pool2-2.9.0.jar and remove
spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).

On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Trying to make PySpark with PyCharm work with Structured Streaming
>
> spark-3.0.1-bin-hadoop3.2
> kafka_2.12-1.1.0
>
> Basic code
>
> from __future__ import print_function
> from src.config import config, hive_url
> import sys
> from sparkutils import sparkstuff as s
>
> class MDStreaming:
> def __init__(self, spark_session,spark_context):
> self.spark = spark_session
> self.sc = spark_context
> self.config = config
>
> def startStreaming(self):
> self.sc.setLogLevel("ERROR")
> try:
> kafkaReaderWithHeaders = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "earliest") \
> .load()
> except Exception as e:
> print(f"""{e}, quitting""")
> sys.exit(1)
>
> kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)",
> "CAST(value AS STRING)", "headers") \
> .writeStream \
> .format("console") \
> .option("truncate","false") \
> .start() \
> .awaitTermination()
> kafkaReaderWithHeaders.printSchema()
>
> if __name__ == "__main__":
> appName = config['common']['appName']
> spark_session = s.spark_session(appName)
> spark_context = s.sparkcontext()
> mdstreaming = MDStreaming(spark_session, spark_context)
> mdstreaming.startStreaming()
>
> I have used the following jars in $SYBASE_HOME/jars
>
>   spark-sql-kafka-0-10_2.12-3.0.1.jar
>  kafka-clients-2.7.0.jar
>  spark-streaming-kafka-0-10_2.12-3.0.1.jar
>  spark-token-provider-kafka-0-10_2.12-3.0.1.jar
>
> and also in $SPARK_HOME/conf/spark-defaults.conf
>
> spark.driver.extraClassPath$SPARK_HOME/jars/*.jar
> spark.executor.extraClassPath  $SPARK_HOME/jars/*.jar
>
>
> The error is this:
>
> 2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0 in
> stage 0.0 (TID 3)
> *java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$*
> at
> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.(KafkaBatchPartitionReader.scala:52)
> at
> org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
> at
> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:127)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> pyspark.sql.utils.StreamingQueryException: Writing job aborted.
> === Streaming Query ===
> Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId =
> d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
> Current Committed Offsets: {}
> Current Available Offsets: {KafkaV2[Subscribe[md]]:
> {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
>
> Current State: ACTIVE
> Thread

Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

2021-01-22 Thread Filip
Hi,

I don't have any code for the forEachBatch approach, I mentioned it due to
this response to my question on SO:
https://stackoverflow.com/a/65803718/1017130

I have added some very simple code below that I think shows what I'm trying
to do:
val schema = StructType(
Array(
StructField("senderId1", LongType),
StructField("senderId2", LongType),
StructField("destId1", LongType),
StructField("eventType", IntegerType)
StructField("cost", LongType)
)
)

val fileStreamDf = spark.readStream.schema(schema).option("delimiter",
"\t").csv("D:\\SparkTest")

fileStreamDf.createOrReplaceTempView("myTable")

spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY
senderId1 HAVING count(*) >
1").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE
eventType = 3 GROUP BY senderId2 HAVING sum(cost) >
500").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP
BY destId1 HAVING count(*) >
1000").writeStream.format("console").outputMode("complete").start()

Of course, this is simplified; there are a lot more columns and the queries
should also group by time period, but I didn't want to complicate it.
With this example, I have 3 queries running on the same input files, but
Spark would need to read the files from disk 3 times. These extra reads are
what I'm trying to avoid.
In the real application, the number of queries would be a lot higher and
dynamic (they are generated in response to some configurations made by the
end users).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

2021-01-22 Thread Jacek Laskowski
Hi Filip,

Care to share the code behind "The only thing I found so far involves using
forEachBatch and manually updating my aggregates. "?

I'm not completely sure I understand your use case and hope the code could
shed more light on it. Thank you.

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Thu, Jan 21, 2021 at 5:05 PM Filip 
wrote:

> Hi,
>
> I'm considering using Apache Spark for the development of an application.
> This would replace a legacy program which reads CSV files and does lots
> (tens/hundreds) of aggregations on them. The aggregations are fairly
> simple:
> counts, sums, etc. while applying some filtering conditions on some of the
> columns.
>
> I prefer using structured streaming for its simplicity and low-latency. I'd
> also like to use full SQL queries (via createOrReplaceTempView). However,
> doing multiple queries means Spark will re-read the input files for each
> one
> of them. This seems very inefficient for my use-case.
>
> Does anyone have any suggestions? The only thing I found so far involves
> using forEachBatch and manually updating my aggregates. But, I think there
> should be a simpler solution for this use case.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
There you go:


@udf(returnType=ArrayType(StringType()))
def reader_udf(filename: str) -> List[str]:
with open(filename, "r") as f:
return f.read().split('\n')


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select(explode('content')).writeStream.queryName('test').format('console')\
.option('truncate', False).start()
q.awaitTermination()


From: Amit Joshi 
Sent: Monday, 18 January 2021 20:22
To: Boris Litvak 
Cc: spark-user 
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

Thanks for your code block.
I understood what you are trying to achieve in the code.

But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be done 
on it.

Regards
Amit
On Monday, January 18, 2021, Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
HI Amit,

I was thinking along the lines of (python):


@udf(returnType=StringType())
def reader_udf(filename: str) -> str:
with open(filename, "r") as f:
return f.read()


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()

Now each row contains the contents of the files, provided they are not large 
you can foreach() over the df/rdd and do whatever you want with it, such as 
json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat 
DF, ala 
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala

Note that unless I am missing something you cannot access spark session from 
foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.

Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak mailto:boris.lit...@skf.com>>
Cc: spark-user mailto:user@spark.apache.org>>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 15:04
To: spark-user mailto:user@spark.apache.org>>
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Brian Wylie
Coming in late.. but if I understand correctly, you can simply use the fact
that spark.read (or readStream) will also accept a directory argument. If
you provide a directory spark will automagically pull in all the files in
that directory.

"""Reading in multiple files example"""
spark = 
SparkSession.builder.master('local[*]').appName('spark_streaming').getOrCreate()

# Schema for incoming data
json_schema = StructType([StructField("username", StringType(), True),
  StructField("name", StringType(), True),
  StructField("sex", StringType(), True),
  StructField("address", StringType(), True),
  StructField("mail", StringType(), True),
  StructField("birthdate", DateType(), True),
  StructField("work", StringType(), True),
  StructField("salary", IntegerType(), True),
  StructField("timestamp", TimestampType(), True)])

# Read in a bunch of data files (files are in JSON per line
format)data_directory_path = './data/my_directory'


# Create a Spark DF with a bunch of files
spark_df = spark.read.schema(json_schema).json(data_directory_path)




On Mon, Jan 18, 2021 at 11:22 AM Amit Joshi 
wrote:

> Hi Boris,
>
> Thanks for your code block.
> I understood what you are trying to achieve in the code.
>
> But content in the file are json records seperated by new line.
> And we have to make the dataframe out of it, as some processing has to be
> done on it.
>
> Regards
> Amit
> On Monday, January 18, 2021, Boris Litvak  wrote:
>
>> HI Amit,
>>
>>
>>
>> I was thinking along the lines of (python):
>>
>>
>>
>>
>> @udf(returnType=StringType())
>> def reader_udf(filename: str) -> str:
>> with open(filename, "r") as f:
>> return f.read()
>>
>>
>> def run_locally():
>> with utils.build_spark_session("Local", local=True) as spark:
>> df = spark.readStream.csv(r'testdata', schema
>> =StructType([StructField('filename', StringType(), True)]))
>> df = df.withColumn('content', reader_udf(col('filename')))
>> q = df.select('content').writeStream.queryName('test').format(
>> 'console').start()
>> q.awaitTermination()
>>
>>
>>
>> Now each row contains the contents of the files, provided they are not
>> large you can foreach() over the df/rdd and do whatever you want with it,
>> such as json.loads()/etc.
>>
>> If you know the shema of the jsons, you can later explode() them into a
>> flat DF, ala
>> https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
>>
>>
>>
>> Note that unless I am missing something you cannot access spark session
>> from foreach as code is not running on the driver.
>>
>> Please say if it makes sense or did I miss anything.
>>
>>
>>
>> Boris
>>
>>
>>
>> *From:* Amit Joshi 
>> *Sent:* Monday, 18 January 2021 17:10
>> *To:* Boris Litvak 
>> *Cc:* spark-user 
>> *Subject:* Re: [Spark Structured Streaming] Processing the data path
>> coming from kafka.
>>
>>
>>
>> Hi Boris,
>>
>>
>>
>> I need to do processing on the data present in the path.
>>
>> That is the reason I am trying to make the dataframe.
>>
>>
>>
>> Can you please provide the example of your solution?
>>
>>
>>
>> Regards
>>
>> Amit
>>
>>
>>
>> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
>> wrote:
>>
>> Hi Amit,
>>
>>
>>
>> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
>> that reads the paths?
>>
>> Also, do you really have to read the json into an additional dataframe?
>>
>>
>>
>> Thanks, Boris
>>
>>
>>
>> *From:* Amit Joshi 
>> *Sent:* Monday, 18 January 2021 15:04
>> *To:* spark-user 
>> *Subject:* [Spark Structured Streaming] Processing the data path coming
>> from kafka.
>>
>>
>>
>> Hi ,
>>
>>
>>
>> I have a use case where the file path of the json records stored in s3
>> are coming as a kafka
>>
>> message in kafka. I have to process the data using spark structured
>> streaming.
>>
>>
>>
>>

Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

Thanks for your code block.
I understood what you are trying to achieve in the code.

But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be
done on it.

Regards
Amit
On Monday, January 18, 2021, Boris Litvak  wrote:

> HI Amit,
>
>
>
> I was thinking along the lines of (python):
>
>
>
>
> @udf(returnType=StringType())
> def reader_udf(filename: str) -> str:
> with open(filename, "r") as f:
> return f.read()
>
>
> def run_locally():
> with utils.build_spark_session("Local", local=True) as spark:
> df = spark.readStream.csv(r'testdata', schema=StructType([
> StructField('filename', StringType(), True)]))
> df = df.withColumn('content', reader_udf(col('filename')))
> q = df.select('content').writeStream.queryName('test').format(
> 'console').start()
> q.awaitTermination()
>
>
>
> Now each row contains the contents of the files, provided they are not
> large you can foreach() over the df/rdd and do whatever you want with it,
> such as json.loads()/etc.
>
> If you know the shema of the jsons, you can later explode() them into a
> flat DF, ala https://stackoverflow.com/questions/38243717/spark-
> explode-nested-json-with-array-in-scala
>
>
>
> Note that unless I am missing something you cannot access spark session
> from foreach as code is not running on the driver.
>
> Please say if it makes sense or did I miss anything.
>
>
>
> Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 17:10
> *To:* Boris Litvak 
> *Cc:* spark-user 
> *Subject:* Re: [Spark Structured Streaming] Processing the data path
> coming from kafka.
>
>
>
> Hi Boris,
>
>
>
> I need to do processing on the data present in the path.
>
> That is the reason I am trying to make the dataframe.
>
>
>
> Can you please provide the example of your solution?
>
>
>
> Regards
>
> Amit
>
>
>
> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:
>
> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
HI Amit,

I was thinking along the lines of (python):


@udf(returnType=StringType())
def reader_udf(filename: str) -> str:
with open(filename, "r") as f:
return f.read()


def run_locally():
with utils.build_spark_session("Local", local=True) as spark:
df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
df = df.withColumn('content', reader_udf(col('filename')))
q = 
df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()

Now each row contains the contents of the files, provided they are not large 
you can foreach() over the df/rdd and do whatever you want with it, such as 
json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat 
DF, ala 
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala

Note that unless I am missing something you cannot access spark session from 
foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.

Boris

From: Amit Joshi 
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak 
Cc: spark-user 
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
mailto:boris.lit...@skf.com>> wrote:
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 15:04
To: spark-user mailto:user@spark.apache.org>>
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:

> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


RE: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Boris Litvak
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi 
Sent: Monday, 18 January 2021 15:04
To: spark-user 
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi


Re: [Spark Structured Streaming] Not working while worker node is on different machine

2020-12-23 Thread lec ssmi
Any more detail about it ?

bannya  于2020年12月18日周五 上午11:25写道:

> Hi,
>
> I have a spark structured streaming application that is reading data from a
> Kafka topic (16 partitions). I am using standalone mode. I have two workers
> node, one node is on the same machine with masters and another one is on a
> different machine. Both of the worker nodes has 8 cores and 16G RAM with
> one
> executor.
>
> While I run the streaming application with one worker node which is on the
> same machine as the master, the application is working fine. But while I am
> running the application with two worker nodes, 8 tasks successfully
> completed running on worker node 1 (which is on the same machine as
> masters), but the other 8 tasks are scheduled on another worker node but
> it's got stuck in the RUNNING stage and application got stuck.
>
> The normal spark application is running fine with this setup.
>
> Can anyone help me with this?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Structured Streaming XML content

2020-11-26 Thread akstremepro
Hi, I am trying something similar now. However not able to do it on the
streaming dataframe myself. Were you able to resolve it?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-21 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
I think MaxOffsetsPerTrigger in Spark + Kafka integration docs would meet your 
requirement

Отправлено с iPhone

> 21 окт. 2020 г., в 12:36, KhajaAsmath Mohammed  
> написал(а):
> 
> Thanks. Do we have option to limit number of records ? Like process only 
> 1 or the property we pass ? This way we can handle the amount of the data 
> for batches that we need . 
> 
> Sent from my iPhone
> 
>>> On Oct 21, 2020, at 12:11 AM, lec ssmi  wrote:
>>> 
>> 
>> Structured streaming's  bottom layer also uses a micro-batch mechanism. 
>> It seems that the first batch is slower than  the latter, I also often 
>> encounter this problem. It feels related to the division of batches. 
>>Other the other hand, spark's batch size is usually bigger than flume 
>> transaction bache size. 
>> 
>> 
>> KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:
>>> Yes. Changing back to latest worked but I still see the slowness compared 
>>> to flume. 
>>> 
>>> Sent from my iPhone
>>> 
> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
> 
 
 Do you start your application  with  chasing the early Kafka data  ? 
 
 Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
> Are you getting any output? Streaming jobs typically run forever, and 
> keep processing data as it comes in the input. If a streaming job is 
> working well, it will typically generate output at a certain cadence
> 
>  
> 
> From: KhajaAsmath Mohammed 
> Date: Tuesday, October 20, 2020 at 1:23 PM
> To: "user @spark" 
> Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with 
> query 0
> 
>  
> 
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and 
> know the content is safe.
> 
>  
> 
> Hi,
> 
>  
> 
> I have started using spark structured streaming for reading data from 
> kaka and the job is very slow. Number of output rows keeps increasing in 
> query 0 and the job is running forever. any suggestions for this please? 
> 
>  
> 
> 
>  
> 
> Thanks,
> 
> Asmath


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-21 Thread KhajaAsmath Mohammed
Thanks. Do we have option to limit number of records ? Like process only 1 
or the property we pass ? This way we can handle the amount of the data for 
batches that we need . 

Sent from my iPhone

> On Oct 21, 2020, at 12:11 AM, lec ssmi  wrote:
> 
> 
> Structured streaming's  bottom layer also uses a micro-batch mechanism. 
> It seems that the first batch is slower than  the latter, I also often 
> encounter this problem. It feels related to the division of batches. 
>Other the other hand, spark's batch size is usually bigger than flume 
> transaction bache size. 
> 
> 
> KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:
>> Yes. Changing back to latest worked but I still see the slowness compared to 
>> flume. 
>> 
>> Sent from my iPhone
>> 
 On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
 
>>> 
>>> Do you start your application  with  chasing the early Kafka data  ? 
>>> 
>>> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
 Are you getting any output? Streaming jobs typically run forever, and keep 
 processing data as it comes in the input. If a streaming job is working 
 well, it will typically generate output at a certain cadence
 
  
 
 From: KhajaAsmath Mohammed 
 Date: Tuesday, October 20, 2020 at 1:23 PM
 To: "user @spark" 
 Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with 
 query 0
 
  
 
 CAUTION: This email originated from outside of the organization. Do not 
 click links or open attachments unless you can confirm the sender and know 
 the content is safe.
 
  
 
 Hi,
 
  
 
 I have started using spark structured streaming for reading data from kaka 
 and the job is very slow. Number of output rows keeps increasing in query 
 0 and the job is running forever. any suggestions for this please? 
 
  
 
 
  
 
 Thanks,
 
 Asmath


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread lec ssmi
Structured streaming's  bottom layer also uses a micro-batch
mechanism. It seems that the first batch is slower than  the latter, I also
often encounter this problem. It feels related to the division of batches.
   Other the other hand, spark's batch size is usually bigger than flume
transaction bache size.


KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:

> Yes. Changing back to latest worked but I still see the slowness compared
> to flume.
>
> Sent from my iPhone
>
> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
>
> 
> Do you start your application  with  chasing the early Kafka data  ?
>
> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
>
>> Are you getting any output? Streaming jobs typically run forever, and
>> keep processing data as it comes in the input. If a streaming job is
>> working well, it will typically generate output at a certain cadence
>>
>>
>>
>> *From: *KhajaAsmath Mohammed 
>> *Date: *Tuesday, October 20, 2020 at 1:23 PM
>> *To: *"user @spark" 
>> *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with
>> query 0
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hi,
>>
>>
>>
>> I have started using spark structured streaming for reading data from
>> kaka and the job is very slow. Number of output rows keeps increasing in
>> query 0 and the job is running forever. any suggestions for this please?
>>
>>
>>
>> 
>>
>>
>>
>> Thanks,
>>
>> Asmath
>>
>


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread KhajaAsmath Mohammed
Yes. Changing back to latest worked but I still see the slowness compared to 
flume. 

Sent from my iPhone

> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
> 
> 
> Do you start your application  with  chasing the early Kafka data  ? 
> 
> Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
>> Are you getting any output? Streaming jobs typically run forever, and keep 
>> processing data as it comes in the input. If a streaming job is working 
>> well, it will typically generate output at a certain cadence
>> 
>>  
>> 
>> From: KhajaAsmath Mohammed 
>> Date: Tuesday, October 20, 2020 at 1:23 PM
>> To: "user @spark" 
>> Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with query >> 0
>> 
>>  
>> 
>> CAUTION: This email originated from outside of the organization. Do not 
>> click links or open attachments unless you can confirm the sender and know 
>> the content is safe.
>> 
>>  
>> 
>> Hi,
>> 
>>  
>> 
>> I have started using spark structured streaming for reading data from kaka 
>> and the job is very slow. Number of output rows keeps increasing in query 0 
>> and the job is running forever. any suggestions for this please? 
>> 
>>  
>> 
>> 
>> 
>>  
>> 
>> Thanks,
>> 
>> Asmath


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread lec ssmi
Do you start your application  with  chasing the early Kafka data  ?

Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:

> Are you getting any output? Streaming jobs typically run forever, and keep
> processing data as it comes in the input. If a streaming job is working
> well, it will typically generate output at a certain cadence
>
>
>
> *From: *KhajaAsmath Mohammed 
> *Date: *Tuesday, October 20, 2020 at 1:23 PM
> *To: *"user @spark" 
> *Subject: *[EXTERNAL] Spark Structured streaming - Kakfa - slowness with
> query 0
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hi,
>
>
>
> I have started using spark structured streaming for reading data from kaka
> and the job is very slow. Number of output rows keeps increasing in query 0
> and the job is running forever. any suggestions for this please?
>
>
>
>
>
> Thanks,
>
> Asmath
>


Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-20 Thread Lalwani, Jayesh
Are you getting any output? Streaming jobs typically run forever, and keep 
processing data as it comes in the input. If a streaming job is working well, 
it will typically generate output at a certain cadence

From: KhajaAsmath Mohammed 
Date: Tuesday, October 20, 2020 at 1:23 PM
To: "user @spark" 
Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with query 0


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

I have started using spark structured streaming for reading data from kaka and 
the job is very slow. Number of output rows keeps increasing in query 0 and the 
job is running forever. any suggestions for this please?

[cid:image001.png@01D6A6EA.F513EC50]

Thanks,
Asmath


Re: Spark structured streaming: periodically refresh static data frame

2020-09-17 Thread Harsh
As per the solution, if we are closing and starting the query, then what
happens to the the state which is maintained in memory, will that be
retained ? 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [SPARK-STRUCTURED-STREAMING] IllegalStateException: Race while writing batch 4

2020-08-12 Thread Jungtaek Lim
File stream sink doesn't support the functionality. There're several
approaches to do so:

1) two queries write to Kafka (or any intermediate storage which allows
concurrent writes), and let next Spark application read and write to the
final path
2) two queries write to two different directories, and let next Spark
application read and write to the final path
3) use alternative data sources which enable concurrent writes on writing
files (you may want to check Delta Lake, Apache Hudi, Apache Iceberg for
such functionalities - though you'd probably need to learn many other
things to maintain the table in good shape)

Thanks,
Jungtaek Lim (HeartSaVioR)

On Sat, Aug 8, 2020 at 4:19 AM Amit Joshi  wrote:

> Hi,
>
> I have 2spark structure streaming queries writing to the same outpath in
> object storage.
> Once in a while I am getting the "IllegalStateException: Race while
> writing batch 4".
> I found that this error is because there are two writers writing to the
> output path. The file streaming sink doesn't support multiple writers.
> It assumes there is only one writer writing to the path. Each query needs
> to use its own output directory.
>
> Is there a way to write the output to the same path by both queries, as I
> need the output at the same path.?
>
> Regards
> Amit Joshi
>


Re: Spark Structured Streaming keep on consuming usercache

2020-07-20 Thread Piyush Acharya
Can you try calling batchDF.unpersist() once the work is done in loop?

On Mon, Jul 20, 2020 at 3:38 PM Yong Yuan  wrote:

> It seems the following structured streaming code keeps on consuming
> usercache until all disk space are occupied.
>
> val monitoring_stream =
> monitoring_df.writeStream
> .trigger(Trigger.ProcessingTime("120  seconds"))
> .foreachBatch {
> (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)   batchDF.show()
> }
>
>
> I even did not call batchDF.persist(). Do I need to really save/write
> batchDF to somewhere to release the usercache?
>
> I also tried to call spark.catalog.clearCache() explicitly in a loop,
> which does not help solve this problem either.
>
> Below figure also shows the capacity of the cluster is decreasing with the
> running of these codes.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread Jungtaek Lim
In SS, checkpointing is now a part of running micro-batch and it's
supported natively. (making clear, my library doesn't deal with the native
behavior of checkpointing)

In other words, it can't be customized like you have been doing with your
database. You probably don't need to do it with SS, but it still depends on
what you did with the offsets in the database.

On Tue, Jul 7, 2020 at 1:40 AM KhajaAsmath Mohammed 
wrote:

> Thanks Lim, this is really helpful. I have few questions.
>
> Our earlier approach used low level customer to read offsets from database
> and use those information to read using spark streaming in Dstreams. Save
> the offsets back once the process is finished. This way we never lost data.
>
> with your library, will it automatically process from the last offset it
> processed when the application was stopped or killed for some time.
>
> Thanks,
> Asmath
>
> On Sun, Jul 5, 2020 at 6:22 PM Jungtaek Lim 
> wrote:
>
>> There're sections in SS programming guide which exactly answer these
>> questions:
>>
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>>
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>>
>> Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
>> the author) to help you commit the offset to Kafka with the specific group
>> ID.
>>
>> https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
>>
>> After then, you can also leverage the Kafka ecosystem to monitor the
>> progress in point of Kafka's view, especially the gap between highest
>> offset and committed offset.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>>
>> On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
>> wrote:
>>
>>> In 3.0 the community just added it.
>>>
>>> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
>>> wrote:
>>>
 Hi,

 We are trying to move our existing code from spark dstreams to
 structured streaming for one of the old application which we built few
 years ago.

 Structured streaming job doesn’t have streaming tab in sparkui. Is
 there a way to monitor the job submitted by us in structured streaming ?
 Since the job runs for every trigger, how can we kill the job and restart
 if needed.

 Any suggestions on this please

 Thanks,
 Asmath



 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-06 Thread KhajaAsmath Mohammed
Thanks Lim, this is really helpful. I have few questions.

Our earlier approach used low level customer to read offsets from database
and use those information to read using spark streaming in Dstreams. Save
the offsets back once the process is finished. This way we never lost data.

with your library, will it automatically process from the last offset it
processed when the application was stopped or killed for some time.

Thanks,
Asmath

On Sun, Jul 5, 2020 at 6:22 PM Jungtaek Lim 
wrote:

> There're sections in SS programming guide which exactly answer these
> questions:
>
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries
>
> Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
> the author) to help you commit the offset to Kafka with the specific group
> ID.
>
> https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer
>
> After then, you can also leverage the Kafka ecosystem to monitor the
> progress in point of Kafka's view, especially the gap between highest
> offset and committed offset.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
> wrote:
>
>> In 3.0 the community just added it.
>>
>> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
>> wrote:
>>
>>> Hi,
>>>
>>> We are trying to move our existing code from spark dstreams to
>>> structured streaming for one of the old application which we built few
>>> years ago.
>>>
>>> Structured streaming job doesn’t have streaming tab in sparkui. Is there
>>> a way to monitor the job submitted by us in structured streaming ? Since
>>> the job runs for every trigger, how can we kill the job and restart if
>>> needed.
>>>
>>> Any suggestions on this please
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Jungtaek Lim
There're sections in SS programming guide which exactly answer these
questions:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#managing-streaming-queries
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#monitoring-streaming-queries

Also, for Kafka data source, there's a 3rd party project (DISCLAIMER: I'm
the author) to help you commit the offset to Kafka with the specific group
ID.

https://github.com/HeartSaVioR/spark-sql-kafka-offset-committer

After then, you can also leverage the Kafka ecosystem to monitor the
progress in point of Kafka's view, especially the gap between highest
offset and committed offset.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Jul 6, 2020 at 2:53 AM Gabor Somogyi 
wrote:

> In 3.0 the community just added it.
>
> On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
> wrote:
>
>> Hi,
>>
>> We are trying to move our existing code from spark dstreams to structured
>> streaming for one of the old application which we built few years ago.
>>
>> Structured streaming job doesn’t have streaming tab in sparkui. Is there
>> a way to monitor the job submitted by us in structured streaming ? Since
>> the job runs for every trigger, how can we kill the job and restart if
>> needed.
>>
>> Any suggestions on this please
>>
>> Thanks,
>> Asmath
>>
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark structured streaming -Kafka - deployment / monitor and restart

2020-07-05 Thread Gabor Somogyi
In 3.0 the community just added it.

On Sun, 5 Jul 2020, 14:28 KhajaAsmath Mohammed, 
wrote:

> Hi,
>
> We are trying to move our existing code from spark dstreams to structured
> streaming for one of the old application which we built few years ago.
>
> Structured streaming job doesn’t have streaming tab in sparkui. Is there a
> way to monitor the job submitted by us in structured streaming ? Since the
> job runs for every trigger, how can we kill the job and restart if needed.
>
> Any suggestions on this please
>
> Thanks,
> Asmath
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Srinivas V
Cool. Are you not using watermark ?
Also, is it possible to start listening offsets from a specific date time ?

Regards
Srini

On Sat, Jun 27, 2020 at 6:12 AM Eric Beabes 
wrote:

> My apologies...  After I set the 'maxOffsetsPerTrigger' to a value such as
> '20' it started working. Hopefully this will help someone. Thanks.
>
> On Fri, Jun 26, 2020 at 2:12 PM Something Something <
> mailinglist...@gmail.com> wrote:
>
>> My Spark Structured Streaming job works fine when I set "startingOffsets"
>> to "latest". When I simply change it to "earliest" & specify a new "check
>> point directory", the job doesn't work. The states don't get timed out
>> after 10 minutes.
>>
>> While debugging I noticed that my 'state' logic is indeed getting
>> executed but states just don't time out - as they do when I use "latest".
>> Any reason why?
>>
>> Is this a known issue?
>>
>> *Note*: I've tried this under Spark 2.3 & 2.4
>>
>


Re: Spark Structured Streaming: “earliest” as “startingOffsets” is not working

2020-06-26 Thread Eric Beabes
My apologies...  After I set the 'maxOffsetsPerTrigger' to a value such as
'20' it started working. Hopefully this will help someone. Thanks.

On Fri, Jun 26, 2020 at 2:12 PM Something Something <
mailinglist...@gmail.com> wrote:

> My Spark Structured Streaming job works fine when I set "startingOffsets"
> to "latest". When I simply change it to "earliest" & specify a new "check
> point directory", the job doesn't work. The states don't get timed out
> after 10 minutes.
>
> While debugging I noticed that my 'state' logic is indeed getting executed
> but states just don't time out - as they do when I use "latest". Any reason
> why?
>
> Is this a known issue?
>
> *Note*: I've tried this under Spark 2.3 & 2.4
>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
ok, thanks for confirming, I will do it this way.

Regards
Srini

On Tue, Jun 9, 2020 at 11:31 PM Gerard Maas  wrote:

> Hi Srinivas,
>
> Reading from different brokers is possible but you need to connect to each
> Kafka cluster separately.
> Trying to mix connections to two different Kafka clusters in one
> subscriber is not supported. (I'm sure that it would give all kind of weird
> errors)
> The  "kafka.bootstrap.servers" option is there to indicate the potential
> many brokers of the *same* Kafka cluster.
>
> The way to address this is following the suggestion of German to create a
> subscriptions for each Kafka cluster you are talking to.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
>   .option("subscribe", "topic1, topic2")
>  .load()
>
> val df_cluster2 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
>   .option("subscribe", "topic3, topicn, topicn+1,")
>  .load()
>
> After acquiring the DataFrame, you can union them and treat all the data
> with a single process.
>
> val unifiedData = df_cluster1.union(df_cluster2)
> // apply further transformations on `unifiedData`
>
> kr, Gerard.
>
>
> :
>
>
>
> On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:
>
>> Thanks for the quick reply. This may work but I have like 5 topics to
>> listen to right now, I am trying to keep all topics in an array in a
>> properties file and trying to read all at once. This way it is dynamic and
>> you have one code block like below and you may add or delete topics from
>> the config file without changing code. If someone confirms that it does not
>> work, I would have to do something like you have provided.
>>
>> val df_cluster1 = spark
>>   .read
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", 
>> "cluster1_host:cluster1_port,cluster2_host:port")
>>
>> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>>
>>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Gerard Maas
Hi Srinivas,

Reading from different brokers is possible but you need to connect to each
Kafka cluster separately.
Trying to mix connections to two different Kafka clusters in one subscriber
is not supported. (I'm sure that it would give all kind of weird errors)
The  "kafka.bootstrap.servers" option is there to indicate the potential
many brokers of the *same* Kafka cluster.

The way to address this is following the suggestion of German to create a
subscriptions for each Kafka cluster you are talking to.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1, topic2")
 .load()

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:cluster2_port")
  .option("subscribe", "topic3, topicn, topicn+1,")
 .load()

After acquiring the DataFrame, you can union them and treat all the data
with a single process.

val unifiedData = df_cluster1.union(df_cluster2)
// apply further transformations on `unifiedData`

kr, Gerard.


:



On Tue, Jun 9, 2020 at 6:30 PM Srinivas V  wrote:

> Thanks for the quick reply. This may work but I have like 5 topics to
> listen to right now, I am trying to keep all topics in an array in a
> properties file and trying to read all at once. This way it is dynamic and
> you have one code block like below and you may add or delete topics from
> the config file without changing code. If someone confirms that it does not
> work, I would have to do something like you have provided.
>
> val df_cluster1 = spark
>   .read
>   .format("kafka")
>   .option("kafka.bootstrap.servers", 
> "cluster1_host:cluster1_port,cluster2_host:port")
>
> .option("subscribe", "topic1, topic2,topic3,topic4,topic5")
>
>


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread Srinivas V
Thanks for the quick reply. This may work but I have like 5 topics to
listen to right now, I am trying to keep all topics in an array in a
properties file and trying to read all at once. This way it is dynamic and
you have one code block like below and you may add or delete topics from
the config file without changing code. If someone confirms that it does not
work, I would have to do something like you have provided.

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers",
"cluster1_host:cluster1_port,cluster2_host:port")

.option("subscribe", "topic1, topic2,topic3,topic4,topic5")


Re: [spark-structured-streaming] [kafka] consume topics from multiple Kafka clusters

2020-06-09 Thread German SM
Hello,

I've never tried that, this doesn't work?

val df_cluster1 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster1_host:cluster1_port")
  .option("subscribe", "topic1")

val df_cluster2 = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "cluster2_host:port")
  .option("subscribe", "topic2")


On Tue, 9 Jun 2020 at 18:10, Srinivas V  wrote:

> Hello,
>  In Structured Streaming, is it possible to have one spark application
> with one query to consume topics from multiple kafka clusters?
>
> I am trying to consume two topics each from different Kafka Cluster, but
> it gives one of the topics as an unknown topic and the job keeps running
> without completing in Spark UI.
>
> Is it not allowed in Spark 2.4.5?
>
> Regards
> Srini
>
>
>
>


Re: Spark structured streaming - performance tuning

2020-05-08 Thread Srinivas V
Anyone else can answer below questions on performance tuning Structured
streaming?
@Jacek?

On Sun, May 3, 2020 at 12:07 AM Srinivas V  wrote:

> Hi Alex, read the book , it is a good one but i don’t see things which I
> strongly want to understand.
> You are right on the partition and tasks.
> 1.How to use coalesce with spark structured streaming ?
>
> Also I want to ask few more questions,
> 2. How to restrict number of executors on structured streaming?
>  —num-executors is minimum is it ?
> To cap max, can I use spark.dynamicAllocation.maxExecutors ?
>
> 3. Does other streaming properties hold good for structured streaming?
> Like spark.streaming.dynamicAllocation.enabled ?
> If not what are the ones it takes into consideration?
>
> 4. Does structured streaming 2.4.5 allow dynamicAllocation of executors/
> cores? In case of Kafka consumer, when the cluster has to scale down, does
> it reconfigure the mapping of executors cores to kaka partitions?
>
> 5. Why spark srtructured  Streaming web ui (SQL tab) is not so informative
> like streaming tab of Spark streaming ?
>
> It would be great if these questions are answered, otherwise the only
> option left would be to go through the spark code and figure out.
>
> On Sat, Apr 18, 2020 at 1:09 PM Alex Ott  wrote:
>
>> Just to clarify - I didn't write this explicitly in my answer. When you're
>> working with Kafka, every partition in Kafka is mapped into Spark
>> partition. And in Spark, every partition is mapped into task.   But you
>> can
>> use `coalesce` to decrease the number of Spark partitions, so you'll have
>> less tasks...
>>
>> Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
>>  SV> Thank you Alex. I will check it out and let you know if I have any
>> questions
>>
>>  SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:
>>
>>  SV> http://shop.oreilly.com/product/0636920047568.do has quite good
>> information
>>  SV> on it.  For Kafka, you need to start with approximation that
>> processing of
>>  SV> each partition is a separate task that need to be executed, so
>> you need to
>>  SV> plan number of cores correspondingly.
>>  SV>
>>  SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>>  SV>  SV> Hello,
>>  SV>  SV> Can someone point me to a good video or document which
>> takes about performance tuning for structured streaming app?
>>  SV>  SV> I am looking especially for listening to Kafka topics say 5
>> topics each with 100 portions .
>>  SV>  SV> Trying to figure out best cluster size and number of
>> executors and cores required.
>>
>>
>> --
>> With best wishes,Alex Ott
>> http://alexott.net/
>> Twitter: alexott_en (English), alexott (Russian)
>>
>


Re: Spark structured streaming - performance tuning

2020-05-02 Thread Srinivas V
Hi Alex, read the book , it is a good one but i don’t see things which I
strongly want to understand.
You are right on the partition and tasks.
1.How to use coalesce with spark structured streaming ?

Also I want to ask few more questions,
2. How to restrict number of executors on structured streaming?
 —num-executors is minimum is it ?
To cap max, can I use spark.dynamicAllocation.maxExecutors ?

3. Does other streaming properties hold good for structured streaming?
Like spark.streaming.dynamicAllocation.enabled ?
If not what are the ones it takes into consideration?

4. Does structured streaming 2.4.5 allow dynamicAllocation of executors/
cores? In case of Kafka consumer, when the cluster has to scale down, does
it reconfigure the mapping of executors cores to kaka partitions?

5. Why spark srtructured  Streaming web ui (SQL tab) is not so informative
like streaming tab of Spark streaming ?

It would be great if these questions are answered, otherwise the only
option left would be to go through the spark code and figure out.

On Sat, Apr 18, 2020 at 1:09 PM Alex Ott  wrote:

> Just to clarify - I didn't write this explicitly in my answer. When you're
> working with Kafka, every partition in Kafka is mapped into Spark
> partition. And in Spark, every partition is mapped into task.   But you can
> use `coalesce` to decrease the number of Spark partitions, so you'll have
> less tasks...
>
> Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
>  SV> Thank you Alex. I will check it out and let you know if I have any
> questions
>
>  SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:
>
>  SV> http://shop.oreilly.com/product/0636920047568.do has quite good
> information
>  SV> on it.  For Kafka, you need to start with approximation that
> processing of
>  SV> each partition is a separate task that need to be executed, so
> you need to
>  SV> plan number of cores correspondingly.
>  SV>
>  SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>  SV>  SV> Hello,
>  SV>  SV> Can someone point me to a good video or document which takes
> about performance tuning for structured streaming app?
>  SV>  SV> I am looking especially for listening to Kafka topics say 5
> topics each with 100 portions .
>  SV>  SV> Trying to figure out best cluster size and number of
> executors and cores required.
>
>
> --
> With best wishes,Alex Ott
> http://alexott.net/
> Twitter: alexott_en (English), alexott (Russian)
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-19 Thread Jungtaek Lim
You may want to check "where" the job is stuck via taking thread dump - it
could be in kafka consumer, in Spark codebase, etc. Without the information
it's hard to say.

On Thu, Apr 16, 2020 at 4:22 PM Ruijing Li  wrote:

> Thanks Jungtaek, that makes sense.
>
> I tried Burak’s solution of just turning failOnDataLoss to be false, but
> instead of failing, the job is stuck. I’m guessing that the offsets are
> being deleted faster than the job can process them and it will be stuck
> unless I increase resources? Or does once the exception happen, spark will
> hang?
>
> On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> I think Spark is trying to ensure that it reads the input "continuously"
>> without any missing. Technically it may be valid to say the situation is a
>> kind of "data-loss", as the query couldn't process the offsets which are
>> being thrown out, and owner of the query needs to be careful as it affects
>> the result.
>>
>> If your streaming query keeps up with input rate then it's pretty rare
>> for the query to go under retention. Even it lags a bit, it'd be safe if
>> retention is set to enough period. The ideal state would be ensuring your
>> query to process all offsets before they are thrown out by retention (don't
>> leave the query lagging behind - either increasing processing power or
>> increasing retention duration, though most probably you'll need to do
>> former), but if you can't make sure and if you understand the risk then yes
>> you can turn off the option and take the risk.
>>
>>
>> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>>
>>> I see, I wasn’t sure if that would work as expected. The docs seems to
>>> suggest to be careful before turning off that option, and I’m not sure why
>>> failOnDataLoss is true by default.
>>>
>>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>>
 Just set `failOnDataLoss=false` as an option in readStream?

 On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
 wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>
 --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - performance tuning

2020-04-18 Thread Alex Ott
Just to clarify - I didn't write this explicitly in my answer. When you're
working with Kafka, every partition in Kafka is mapped into Spark
partition. And in Spark, every partition is mapped into task.   But you can
use `coalesce` to decrease the number of Spark partitions, so you'll have
less tasks...

Srinivas V  at "Sat, 18 Apr 2020 10:32:33 +0530" wrote:
 SV> Thank you Alex. I will check it out and let you know if I have any 
questions

 SV> On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:

 SV> http://shop.oreilly.com/product/0636920047568.do has quite good 
information
 SV> on it.  For Kafka, you need to start with approximation that 
processing of
 SV> each partition is a separate task that need to be executed, so you 
need to
 SV> plan number of cores correspondingly.
 SV>
 SV> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
 SV>  SV> Hello, 
 SV>  SV> Can someone point me to a good video or document which takes 
about performance tuning for structured streaming app? 
 SV>  SV> I am looking especially for listening to Kafka topics say 5 
topics each with 100 portions .
 SV>  SV> Trying to figure out best cluster size and number of executors 
and cores required. 


-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming - performance tuning

2020-04-17 Thread Srinivas V
Thank you Alex. I will check it out and let you know if I have any questions

On Fri, Apr 17, 2020 at 11:36 PM Alex Ott  wrote:

> http://shop.oreilly.com/product/0636920047568.do has quite good
> information
> on it.  For Kafka, you need to start with approximation that processing of
> each partition is a separate task that need to be executed, so you need to
> plan number of cores correspondingly.
>
> Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
>  SV> Hello,
>  SV> Can someone point me to a good video or document which takes about
> performance tuning for structured streaming app?
>  SV> I am looking especially for listening to Kafka topics say 5 topics
> each with 100 portions .
>  SV> Trying to figure out best cluster size and number of executors and
> cores required.
>
>
> --
> With best wishes,Alex Ott
> http://alexott.net/
> Twitter: alexott_en (English), alexott (Russian)
>


Re: Spark structured streaming - performance tuning

2020-04-17 Thread Alex Ott
http://shop.oreilly.com/product/0636920047568.do has quite good information
on it.  For Kafka, you need to start with approximation that processing of
each partition is a separate task that need to be executed, so you need to
plan number of cores correspondingly.

Srinivas V  at "Thu, 16 Apr 2020 22:49:15 +0530" wrote:
 SV> Hello, 
 SV> Can someone point me to a good video or document which takes about 
performance tuning for structured streaming app? 
 SV> I am looking especially for listening to Kafka topics say 5 topics each 
with 100 portions .
 SV> Trying to figure out best cluster size and number of executors and cores 
required. 


-- 
With best wishes,Alex Ott
http://alexott.net/
Twitter: alexott_en (English), alexott (Russian)

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming - Fallback to earliest offset

2020-04-16 Thread Ruijing Li
Thanks Jungtaek, that makes sense.

I tried Burak’s solution of just turning failOnDataLoss to be false, but
instead of failing, the job is stuck. I’m guessing that the offsets are
being deleted faster than the job can process them and it will be stuck
unless I increase resources? Or does once the exception happen, spark will
hang?

On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim 
wrote:

> I think Spark is trying to ensure that it reads the input "continuously"
> without any missing. Technically it may be valid to say the situation is a
> kind of "data-loss", as the query couldn't process the offsets which are
> being thrown out, and owner of the query needs to be careful as it affects
> the result.
>
> If your streaming query keeps up with input rate then it's pretty rare for
> the query to go under retention. Even it lags a bit, it'd be safe if
> retention is set to enough period. The ideal state would be ensuring your
> query to process all offsets before they are thrown out by retention (don't
> leave the query lagging behind - either increasing processing power or
> increasing retention duration, though most probably you'll need to do
> former), but if you can't make sure and if you understand the risk then yes
> you can turn off the option and take the risk.
>
>
> On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:
>
>> I see, I wasn’t sure if that would work as expected. The docs seems to
>> suggest to be careful before turning off that option, and I’m not sure why
>> failOnDataLoss is true by default.
>>
>> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>>
>>> Just set `failOnDataLoss=false` as an option in readStream?
>>>
>>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li 
>>> wrote:
>>>
 Hi all,

 I have a spark structured streaming app that is consuming from a kafka
 topic with retention set up. Sometimes I face an issue where my query has
 not finished processing a message but the retention kicks in and deletes
 the offset, which since I use the default setting of “failOnDataLoss=true”
 causes my query to fail. The solution I currently have is manual, deleting
 the offsets directory and rerunning.

 I instead like to have spark automatically fall back to the earliest
 offset available. The solutions I saw recommend setting auto.offset =
 earliest, but for structured streaming, you cannot set that. How do I do
 this for structured streaming?

 Thanks!
 --
 Cheers,
 Ruijing Li

>>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Jungtaek Lim
I think Spark is trying to ensure that it reads the input "continuously"
without any missing. Technically it may be valid to say the situation is a
kind of "data-loss", as the query couldn't process the offsets which are
being thrown out, and owner of the query needs to be careful as it affects
the result.

If your streaming query keeps up with input rate then it's pretty rare for
the query to go under retention. Even it lags a bit, it'd be safe if
retention is set to enough period. The ideal state would be ensuring your
query to process all offsets before they are thrown out by retention (don't
leave the query lagging behind - either increasing processing power or
increasing retention duration, though most probably you'll need to do
former), but if you can't make sure and if you understand the risk then yes
you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li  wrote:

> I see, I wasn’t sure if that would work as expected. The docs seems to
> suggest to be careful before turning off that option, and I’m not sure why
> failOnDataLoss is true by default.
>
> On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:
>
>> Just set `failOnDataLoss=false` as an option in readStream?
>>
>> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>>
>>> Hi all,
>>>
>>> I have a spark structured streaming app that is consuming from a kafka
>>> topic with retention set up. Sometimes I face an issue where my query has
>>> not finished processing a message but the retention kicks in and deletes
>>> the offset, which since I use the default setting of “failOnDataLoss=true”
>>> causes my query to fail. The solution I currently have is manual, deleting
>>> the offsets directory and rerunning.
>>>
>>> I instead like to have spark automatically fall back to the earliest
>>> offset available. The solutions I saw recommend setting auto.offset =
>>> earliest, but for structured streaming, you cannot set that. How do I do
>>> this for structured streaming?
>>>
>>> Thanks!
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Ruijing Li
I see, I wasn’t sure if that would work as expected. The docs seems to
suggest to be careful before turning off that option, and I’m not sure why
failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz  wrote:

> Just set `failOnDataLoss=false` as an option in readStream?
>
> On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:
>
>> Hi all,
>>
>> I have a spark structured streaming app that is consuming from a kafka
>> topic with retention set up. Sometimes I face an issue where my query has
>> not finished processing a message but the retention kicks in and deletes
>> the offset, which since I use the default setting of “failOnDataLoss=true”
>> causes my query to fail. The solution I currently have is manual, deleting
>> the offsets directory and rerunning.
>>
>> I instead like to have spark automatically fall back to the earliest
>> offset available. The solutions I saw recommend setting auto.offset =
>> earliest, but for structured streaming, you cannot set that. How do I do
>> this for structured streaming?
>>
>> Thanks!
>> --
>> Cheers,
>> Ruijing Li
>>
> --
Cheers,
Ruijing Li


Re: Spark structured streaming - Fallback to earliest offset

2020-04-14 Thread Burak Yavuz
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li  wrote:

> Hi all,
>
> I have a spark structured streaming app that is consuming from a kafka
> topic with retention set up. Sometimes I face an issue where my query has
> not finished processing a message but the retention kicks in and deletes
> the offset, which since I use the default setting of “failOnDataLoss=true”
> causes my query to fail. The solution I currently have is manual, deleting
> the offsets directory and rerunning.
>
> I instead like to have spark automatically fall back to the earliest
> offset available. The solutions I saw recommend setting auto.offset =
> earliest, but for structured streaming, you cannot set that. How do I do
> this for structured streaming?
>
> Thanks!
> --
> Cheers,
> Ruijing Li
>


Re: spark structured streaming GroupState returns weird values from sate

2020-03-31 Thread Jungtaek Lim
That seems to come from the difference how Spark infers schema and create
serializer / deserializer for Java beans to construct bean encoder.

When inferring schema for Java beans, all properties which have getter
methods are considered. When creating serializer / deserializer, only
properties which have both getter and setter methods are considered. Here
Java Introspector is being used which doesn't seem to deal with fields
(it's the JDK feature hence I guess that's the right definition of Java
beans), so having getter methods which have no pair of setter methods might
lead some problems.

The code line has TODO, but the code is ancient (added in 2015) - I have no
context and I'm not 100% sure fixing it would be safer. Maybe it would be
great if you could provide the simple reproducer to play with, but given
you've fixed the issue...

On Tue, Mar 31, 2020 at 5:01 PM Srinivas V  wrote:

>
> Never mind. It got resolved after I removed extra two getter methods (to
> calculate duration) I created in my State specific Java bean
> (ProductSessionInformation). But I am surprised why it has created so much
> problem. I guess when this bean is converted to Scala class it may not be
> taking care of non getter methods of the fields defined? Still how is that
> causing the state object get corrupt so much?
>
>
> On Sat, Mar 28, 2020 at 7:46 PM Srinivas V  wrote:
>
>> Ok, I will try to create some simple code to reproduce, if I can. Problem
>> is that I am adding this code in an existing big project with several
>> dependencies with spark streaming older version(2.2) on root level etc.
>>
>> Also, I observed that there is @Experimental on GroupState class. What
>> state is it in now? Several people using this feature in prod?
>>
>>
>> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> I have't heard known issue for this - that said, this may require new
>>> investigation which is not possible or require huge effort without simple
>>> reproducer.
>>>
>>> Contributors (who are basically volunteers) may not want to struggle to
>>> reproduce from your partial information - I'd recommend you to spend your
>>> time to help volunteers starting from simple reproducer, if you are stuck
>>> at it and have to resolve it.
>>>
>>> Could you please get rid of the business logic which you may want to
>>> redact, and provide full of source code which reproduces the bug?
>>>
>>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>>>
 Sorry for typos , correcting them below

 On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:

> Sorry I was just changing some names not to send exact names. Please
> ignore that. I am really struggling with this since couple of days. Can
> this happen due to
> 1. some of the values being null or
> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
> 3. Not enough memory ?
> BTW, I am using same names in my code.
>
> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Well, the code itself doesn't seem to be OK - you're using
>> ProductStateInformation as the class of State whereas you provide
>> ProductSessionInformation to Encoder for State.
>>
>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Could you play with Encoders.bean()? You can Encoders.bean() with
>>> your class, and call .schema() with the return value to see how it
>>> transforms to the schema in Spark SQL. The schema must be consistent 
>>> across
>>> multiple JVM runs to make it work properly, but I suspect it doesn't 
>>> retain
>>> the order.
>>>
>>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>>> wrote:
>>>
 I am listening to Kafka topic with a structured streaming
 application with Java,  testing it on my local Mac.
 When I retrieve back GroupState object
 with state.get(), it is giving some random values for the fields in the
 object, some are interchanging some are default and some are junk 
 values.

 See this example below:
 While setting I am setting:
 ProductSessionInformation{requestId='222112345',
 productId='222112345', priority='0', firstEventTimeMillis=1585312384,
 lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
 numberOfEvents=1}

 When I retrieve it back, it comes like this:
 ProductSessionInformation{requestId='some junk characters are
 coming here' productId='222112345', priority='222112345',
 firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
 firstReceivedTimeMillis=1585312384, numberOfEvents=1}

 Any clue why it might be happening? I am stuck with this for couple
 of days. Immediate help is appreciated.

 code snippet:


Re: spark structured streaming GroupState returns weird values from sate

2020-03-31 Thread Srinivas V
Never mind. It got resolved after I removed extra two getter methods (to
calculate duration) I created in my State specific Java bean
(ProductSessionInformation). But I am surprised why it has created so much
problem. I guess when this bean is converted to Scala class it may not be
taking care of non getter methods of the fields defined? Still how is that
causing the state object get corrupt so much?


On Sat, Mar 28, 2020 at 7:46 PM Srinivas V  wrote:

> Ok, I will try to create some simple code to reproduce, if I can. Problem
> is that I am adding this code in an existing big project with several
> dependencies with spark streaming older version(2.2) on root level etc.
>
> Also, I observed that there is @Experimental on GroupState class. What
> state is it in now? Several people using this feature in prod?
>
>
> On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim 
> wrote:
>
>> I have't heard known issue for this - that said, this may require new
>> investigation which is not possible or require huge effort without simple
>> reproducer.
>>
>> Contributors (who are basically volunteers) may not want to struggle to
>> reproduce from your partial information - I'd recommend you to spend your
>> time to help volunteers starting from simple reproducer, if you are stuck
>> at it and have to resolve it.
>>
>> Could you please get rid of the business logic which you may want to
>> redact, and provide full of source code which reproduces the bug?
>>
>> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>>
>>> Sorry for typos , correcting them below
>>>
>>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>>
 Sorry I was just changing some names not to send exact names. Please
 ignore that. I am really struggling with this since couple of days. Can
 this happen due to
 1. some of the values being null or
 2.UTF8  issue ? Or some serilization/ deserilization issue ?
 3. Not enough memory ?
 BTW, I am using same names in my code.

 On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
 kabhwan.opensou...@gmail.com> wrote:

> Well, the code itself doesn't seem to be OK - you're using
> ProductStateInformation as the class of State whereas you provide
> ProductSessionInformation to Encoder for State.
>
> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
> kabhwan.opensou...@gmail.com> wrote:
>
>> Could you play with Encoders.bean()? You can Encoders.bean() with
>> your class, and call .schema() with the return value to see how it
>> transforms to the schema in Spark SQL. The schema must be consistent 
>> across
>> multiple JVM runs to make it work properly, but I suspect it doesn't 
>> retain
>> the order.
>>
>> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
>> wrote:
>>
>>> I am listening to Kafka topic with a structured streaming
>>> application with Java,  testing it on my local Mac.
>>> When I retrieve back GroupState object
>>> with state.get(), it is giving some random values for the fields in the
>>> object, some are interchanging some are default and some are junk 
>>> values.
>>>
>>> See this example below:
>>> While setting I am setting:
>>> ProductSessionInformation{requestId='222112345',
>>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>>> numberOfEvents=1}
>>>
>>> When I retrieve it back, it comes like this:
>>> ProductSessionInformation{requestId='some junk characters are coming
>>> here' productId='222112345', priority='222112345',
>>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>>
>>> Any clue why it might be happening? I am stuck with this for couple
>>> of days. Immediate help is appreciated.
>>>
>>> code snippet:
>>>
>>>
>>> public class StateUpdateTask implements 
>>> MapGroupsWithStateFunction>> ProductSessionUpdate> {
>>>
>>>  @Override
>>> public ProductSessionUpdate call(String productId, Iterator 
>>> eventsIterator, GroupState state) throws 
>>> Exception {
>>> {
>>>
>>>
>>>
>>>   if (state.hasTimedOut()) {
>>>
>>> //
>>>
>>> }else{
>>>
>>> if (state.exists()) {
>>> ProductStateInformation oldSession = state.get();
>>> System.out.println("State for productId:"+productId + " with old 
>>> values "+oldSession);
>>>
>>> }
>>>
>>>
>>> public class EventsApp implements Serializable{
>>>
>>> public void run(String[] args) throws Exception {
>>>
>>> ...
>>>
>>>
>>> Dataset dataSet = sparkSession
>>> .readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "localhost")
>>> .option("startingOffsets","latest")
>>

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Srinivas V
Ok, I will try to create some simple code to reproduce, if I can. Problem
is that I am adding this code in an existing big project with several
dependencies with spark streaming older version(2.2) on root level etc.

Also, I observed that there is @Experimental on GroupState class. What
state is it in now? Several people using this feature in prod?


On Sat, Mar 28, 2020 at 6:23 PM Jungtaek Lim 
wrote:

> I have't heard known issue for this - that said, this may require new
> investigation which is not possible or require huge effort without simple
> reproducer.
>
> Contributors (who are basically volunteers) may not want to struggle to
> reproduce from your partial information - I'd recommend you to spend your
> time to help volunteers starting from simple reproducer, if you are stuck
> at it and have to resolve it.
>
> Could you please get rid of the business logic which you may want to
> redact, and provide full of source code which reproduces the bug?
>
> On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:
>
>> Sorry for typos , correcting them below
>>
>> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>>
>>> Sorry I was just changing some names not to send exact names. Please
>>> ignore that. I am really struggling with this since couple of days. Can
>>> this happen due to
>>> 1. some of the values being null or
>>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>>> 3. Not enough memory ?
>>> BTW, I am using same names in my code.
>>>
>>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Well, the code itself doesn't seem to be OK - you're using
 ProductStateInformation as the class of State whereas you provide
 ProductSessionInformation to Encoder for State.

 On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
 kabhwan.opensou...@gmail.com> wrote:

> Could you play with Encoders.bean()? You can Encoders.bean() with your
> class, and call .schema() with the return value to see how it transforms 
> to
> the schema in Spark SQL. The schema must be consistent across multiple JVM
> runs to make it work properly, but I suspect it doesn't retain the order.
>
> On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
> wrote:
>
>> I am listening to Kafka topic with a structured streaming application
>> with Java,  testing it on my local Mac.
>> When I retrieve back GroupState object
>> with state.get(), it is giving some random values for the fields in the
>> object, some are interchanging some are default and some are junk values.
>>
>> See this example below:
>> While setting I am setting:
>> ProductSessionInformation{requestId='222112345',
>> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
>> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
>> numberOfEvents=1}
>>
>> When I retrieve it back, it comes like this:
>> ProductSessionInformation{requestId='some junk characters are coming
>> here' productId='222112345', priority='222112345',
>> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
>> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>>
>> Any clue why it might be happening? I am stuck with this for couple
>> of days. Immediate help is appreciated.
>>
>> code snippet:
>>
>>
>> public class StateUpdateTask implements 
>> MapGroupsWithStateFunction> ProductSessionUpdate> {
>>
>>  @Override
>> public ProductSessionUpdate call(String productId, Iterator 
>> eventsIterator, GroupState state) throws 
>> Exception {
>> {
>>
>>
>>
>>   if (state.hasTimedOut()) {
>>
>> //
>>
>> }else{
>>
>> if (state.exists()) {
>> ProductStateInformation oldSession = state.get();
>> System.out.println("State for productId:"+productId + " with old 
>> values "+oldSession);
>>
>> }
>>
>>
>> public class EventsApp implements Serializable{
>>
>> public void run(String[] args) throws Exception {
>>
>> ...
>>
>>
>> Dataset dataSet = sparkSession
>> .readStream()
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "localhost")
>> .option("startingOffsets","latest")
>> .option("failOnDataLoss", "false")
>> .option("subscribe", "topic1,topic2")
>> .option("includeTimestamp", true)
>>
>> .load();
>>
>>  eventsDS.groupByKey(
>> new MapFunction() {
>> @Override public String call(Event event) {
>> return event.getProductId();
>> }
>> }, Encoders.STRING())
>> .mapGroupsWithState(
>> new StateUpdateTask(3),
>> Encoders.bean(ProductSessionInformati

Re: spark structured streaming GroupState returns weird values from sate

2020-03-28 Thread Jungtaek Lim
I have't heard known issue for this - that said, this may require new
investigation which is not possible or require huge effort without simple
reproducer.

Contributors (who are basically volunteers) may not want to struggle to
reproduce from your partial information - I'd recommend you to spend your
time to help volunteers starting from simple reproducer, if you are stuck
at it and have to resolve it.

Could you please get rid of the business logic which you may want to
redact, and provide full of source code which reproduces the bug?

On Sat, Mar 28, 2020 at 8:11 PM Srinivas V  wrote:

> Sorry for typos , correcting them below
>
> On Sat, Mar 28, 2020 at 4:39 PM Srinivas V  wrote:
>
>> Sorry I was just changing some names not to send exact names. Please
>> ignore that. I am really struggling with this since couple of days. Can
>> this happen due to
>> 1. some of the values being null or
>> 2.UTF8  issue ? Or some serilization/ deserilization issue ?
>> 3. Not enough memory ?
>> BTW, I am using same names in my code.
>>
>> On Sat, Mar 28, 2020 at 10:50 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Well, the code itself doesn't seem to be OK - you're using
>>> ProductStateInformation as the class of State whereas you provide
>>> ProductSessionInformation to Encoder for State.
>>>
>>> On Fri, Mar 27, 2020 at 11:14 PM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Could you play with Encoders.bean()? You can Encoders.bean() with your
 class, and call .schema() with the return value to see how it transforms to
 the schema in Spark SQL. The schema must be consistent across multiple JVM
 runs to make it work properly, but I suspect it doesn't retain the order.

 On Fri, Mar 27, 2020 at 10:28 PM Srinivas V 
 wrote:

> I am listening to Kafka topic with a structured streaming application
> with Java,  testing it on my local Mac.
> When I retrieve back GroupState object
> with state.get(), it is giving some random values for the fields in the
> object, some are interchanging some are default and some are junk values.
>
> See this example below:
> While setting I am setting:
> ProductSessionInformation{requestId='222112345',
> productId='222112345', priority='0', firstEventTimeMillis=1585312384,
> lastEventTimeMillis=1585312384, firstReceivedTimeMillis=1585312401693,
> numberOfEvents=1}
>
> When I retrieve it back, it comes like this:
> ProductSessionInformation{requestId='some junk characters are coming
> here' productId='222112345', priority='222112345',
> firstEventTimeMillis=1585312401693, lastEventTimeMillis=1,
> firstReceivedTimeMillis=1585312384, numberOfEvents=1}
>
> Any clue why it might be happening? I am stuck with this for couple of
> days. Immediate help is appreciated.
>
> code snippet:
>
>
> public class StateUpdateTask implements 
> MapGroupsWithStateFunction ProductSessionUpdate> {
>
>  @Override
> public ProductSessionUpdate call(String productId, Iterator 
> eventsIterator, GroupState state) throws 
> Exception {
> {
>
>
>
>   if (state.hasTimedOut()) {
>
> //
>
> }else{
>
> if (state.exists()) {
> ProductStateInformation oldSession = state.get();
> System.out.println("State for productId:"+productId + " with old 
> values "+oldSession);
>
> }
>
>
> public class EventsApp implements Serializable{
>
> public void run(String[] args) throws Exception {
>
> ...
>
>
> Dataset dataSet = sparkSession
> .readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost")
> .option("startingOffsets","latest")
> .option("failOnDataLoss", "false")
> .option("subscribe", "topic1,topic2")
> .option("includeTimestamp", true)
>
> .load();
>
>  eventsDS.groupByKey(
> new MapFunction() {
> @Override public String call(Event event) {
> return event.getProductId();
> }
> }, Encoders.STRING())
> .mapGroupsWithState(
> new StateUpdateTask(3),
> Encoders.bean(ProductSessionInformation.class),
> Encoders.bean(ProductSessionUpdate.class),
> GroupStateTimeout.ProcessingTimeTimeout());
>
> ...
>
>
> StreamingQuery query = productUpdates
> .writeStream()
> .foreach(new ForeachWriter() {
> @Override
> public boolean open(long l, long l1) {return true;}
>
> @Override
> public void process(ProductSessionUpdate 
> productSessionUpdate) {
> logger.info("-

  1   2   3   >