Re: How to explode array columns of a dataframe having the same length

2023-02-19 Thread 404




sql:   select inline(arrays_zip(col1, col2, col3)) as (c1, c2, c3) from t1


 Replied Message 
| From | Enrico Minack |
| Date | 02/16/2023 16:06 |
| To |  ,
sam smith |
| Subject | Re: How to explode array columns of a dataframe having the same 
length |
You have to take each row and zip the lists, each element of the result becomes 
one new row.


So turn write a method that turns

  Row(List("A","B","null"), List("C","D","null"), List("E","null","null"))

into
  List(List("A","C","E"), List("B","D","null"), List("null","null","null"))

and use flatmap with that method.



In Scala, this would read:


df.flatMap { row => (row.getSeq[String](0), row.getSeq[String](1), 
row.getSeq[String](2)).zipped.toIterable }.show()


Enrico




Am 14.02.23 um 22:54 schrieb sam smith:

Hello guys,


I have the following dataframe:


|
|

col1

|

col2

|

col3

|
|

["A","B","null"]

|

["C","D","null"]

|

["E","null","null"]

|
|
|
|
I want to explode it to the following dataframe:


|

col1

|

col2

|

col3

|
|

"A"

|

"C"

|

"E"

|
|

"B"

|

"D"

|

"null"

|
|

"null"

|

"null"

|

"null"

|


How to do that (preferably in Java) using the explode() method ? knowing that 
something like the following won't yield correct output:


for (String colName: dataset.columns())
dataset=dataset.withColumn(colName,explode(dataset.col(colName)));








Re: Graceful shutdown SPARK Structured Streaming

2023-02-19 Thread Bjørn Jørgensen
What is it that stop(stopGraceFully=True) does then?
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/streaming/context.html#StreamingContext.stop

ons. 8. feb. 2023 kl. 19:22 skrev Brian Wylie :

> It's been a few years (so this approach might be out of date) but here's
> what I used for PySpark as part of this SO (
> https://stackoverflow.com/questions/45717433/stop-structured-streaming-query-gracefully/65708677
> )
>
> ```
>
> # Helper method to stop a streaming query
> def stop_stream_query(query, wait_time):
> """Stop a running streaming query"""
> while query.isActive:
> msg = query.status['message']
> data_avail = query.status['isDataAvailable']
> trigger_active = query.status['isTriggerActive']
> if not data_avail and not trigger_active and msg != "Initializing 
> sources":
> print('Stopping query...')
> query.stop()
> time.sleep(0.5)
>
> # Okay wait for the stop to happen
> print('Awaiting termination...')
> query.awaitTermination(wait_time)
> ```
>
>
> I'd also be interested is there is a newer/better way to do this.. so please 
> cc me on updates :)
>
>
> On Thu, May 6, 2021 at 1:08 PM Mich Talebzadeh 
> wrote:
>
>> That is a valid question and I am not aware of any new addition to Spark
>> Structured Streaming (SSS) in newer releases for this graceful shutdown.
>>
>> Going back to my earlier explanation, there are occasions that you may
>> want to stop the Spark program gracefully. Gracefully meaning that Spark
>> application handles the last streaming message completely and terminates
>> the application. This is different from invoking interrupts such as CTRL-C.
>> Of course one can terminate the process based on the following
>>
>>
>>1.
>>
>>query.awaitTermination() # Waits for the termination of this query,
>>with stop() or with error
>>2.
>>
>>query.awaitTermination(timeoutMs) # Returns true if this query is
>>terminated within the timeout in milliseconds.
>>
>> So the first one above waits until an interrupt signal is received. The
>> second one will count the timeout and will exit when timeout in
>> milliseconds is reached
>>
>> The issue is that one needs to predict how long the streaming job needs
>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>> may end up the processing terminated without a proper completion of the
>> streaming process.
>> So I gather if we agree on what constitutes a graceful shutdown we can
>> consider both the tool offerings from Spark itself  or what solutions we
>> can come up with.
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 6 May 2021 at 13:28, ayan guha  wrote:
>>
>>> What are some other "newer" methodologies?
>>>
>>> Really interested to understand what is possible here as this is a topic
>>> came up in this forum time and again.
>>>
>>> On Thu, 6 May 2021 at 5:13 pm, Gourav Sengupta <
>>> gourav.sengupta.develo...@gmail.com> wrote:
>>>
 Hi Mich,

 thanks a ton for your kind response, looks like we are still using the
 earlier methodologies for stopping a spark streaming program gracefully.


 Regards,
 Gourav Sengupta

 On Wed, May 5, 2021 at 6:04 PM Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

>
> Hi,
>
>
> I believe I discussed this in this forum. I sent the following to
> spark-dev forum as an add-on to Spark functionality. This is the gist of
> it.
>
>
> Spark Structured Streaming AKA SSS is a very useful tool in dealing
> with Event Driven Architecture. In an Event Driven Architecture, there is
> generally a main loop that listens for events and then triggers a 
> call-back
> function when one of those events is detected. In a streaming application
> the application waits to receive the source messages in a set interval or
> whenever they happen and reacts accordingly.
>
> There are occasions that you may want to stop the Spark program
> gracefully. Gracefully meaning that Spark application handles the
> last streaming message completely and terminates the application. This is
> different from invoking interrupts such as CTRL-C. Of course one can
> terminate the process based on the following
>
>
>1.
>
>query.awaitTermination() # Waits for the termination of this
>query, with stop() or with error
>2.
>
>query.awaitTermination(timeoutMs) # Returns true if this query is

Re: SPIP: Shutting down spark structured streaming when the streaming process completed current process

2023-02-19 Thread Mich Talebzadeh
Hi  Dongjoon.,

This was an oversight from my side. I confused your involvement with docker
build stuff.

HTH



   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 19 Feb 2023 at 01:02, Dongjoon Hyun  wrote:

> Thank you for considering me, but may I ask what makes you think to put me
> there, Mich? I'm curious about your reason.
>
> > I have put dongjoon.hyun as a shepherd.
>
> BTW, unfortunately, I cannot help you with that due to my on-going
> personal stuff. I'll adjust the JIRA first.
>
> Thanks,
> Dongjoon.
>
>
> On Sat, Feb 18, 2023 at 10:51 AM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> https://issues.apache.org/jira/browse/SPARK-42485
>>
>>
>> Spark Structured Streaming is a very useful tool in dealing with Event
>> Driven Architecture. In an Event Driven Architecture, there is generally a
>> main loop that listens for events and then triggers a call-back function
>> when one of those events is detected. In a streaming application the
>> application waits to receive the source messages in a set interval or
>> whenever they happen and reacts accordingly.
>>
>> There are occasions that you may want to stop the Spark program
>> gracefully. Gracefully meaning that Spark application handles the last
>> streaming message completely and terminates the application. This is
>> different from invoking interrupts such as CTRL-C.
>>
>> Of course one can terminate the process based on the following
>>
>>1. query.awaitTermination() # Waits for the termination of this
>>query, with stop() or with error
>>
>>
>>1. query.awaitTermination(timeoutMs) # Returns true if this query is
>>terminated within the timeout in milliseconds.
>>
>> So the first one above waits until an interrupt signal is received. The
>> second one will count the timeout and will exit when the timeout in
>> milliseconds is reached.
>>
>> The issue is that one needs to predict how long the streaming job needs
>> to run. Clearly any interrupt at the terminal or OS level (kill process),
>> may end up the processing terminated without a proper completion of the
>> streaming process.
>>
>> I have devised a method that allows one to terminate the spark
>> application internally after processing the last received message. Within
>> say 2 seconds of the confirmation of shutdown, the process will invoke a
>> graceful shutdown.
>>
>> This new feature proposes a solution to handle the topic doing work for
>> the message being processed gracefully, wait for it to complete and
>> shutdown the streaming process for a given topic without loss of data or
>> orphaned transactions
>>
>>
>> I have put dongjoon.hyun as a shepherd. Kindly advise me if that is the
>> correct approach.
>>
>> JIRA ticket https://issues.apache.org/jira/browse/SPARK-42485
>>
>> SPIP doc: TBC
>>
>> Discussion thread: in
>>
>> https://lists.apache.org/list.html?d...@spark.apache.org
>>
>>
>> Thanks.
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>