Re: Shutting down spark structured streaming when the streaming process completed current process

2021-04-23 Thread Mich Talebzadeh
Like to hear comments on this. Basically the ability to shutdown a running
spark structured streaming process gracefully.

In a way it may be something worth integrating in Spark structured
streaming. Much like Kafka team are working to get rid of zooKeeper and
replacing it with a system type topic.

I cannot think of any other way of shutting an event driven architecture
externally except by brute force Control c etc which may need spark process
aborted half way.

Cheers


Mich


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 23 Apr 2021 at 10:36, Mich Talebzadeh 
wrote:

>
> Hi,
>
>
> This is the design that I came up with.
>
>
> How to shutdown the topic doing work for the message being processed, wait
> for it to complete and shutdown the streaming process for a given topic.
>
>
> I thought about this and looked at options. Using sensors to
> implement this like airflow would be expensive as for example reading a
> file from object storage or from an underlying database would have incurred
> additional I/O overheads through continuous polling.
>
>
> So the design had to be incorporated into the streaming process itself.
> What I came up with was an addition of a control topic which keeps running
> triggered every 2 seconds say and is in json format with the following
> structure
>
>
> root
>
>  |-- newtopic_value: struct (nullable = true)
>
>  ||-- uuid: string (nullable = true)
>
>  ||-- timeissued: timestamp (nullable = true)
>
>  ||-- queue: string (nullable = true)
>
>  ||-- status: string (nullable = true)
>
> In above the queue refers to the business topic) and status is set to
> 'true', meaning BAU. This control topic streaming  can be
> restarted anytime, and status can be set to false if we want to stop the
> streaming queue for a given business topic
>
> ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
> {"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
> "timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}
>
> 64a8321c-1593-428b-ae65-89e45ddf0640
> {"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
> "timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}
>
> So how can I stop the business queue when the current business topic
> message has been processed? Let us say the source is sending data for a
> business topic every 30 seconds. Our control topic sends a one liner as
> above every 2 seconds.
>
> In your writestream add the following line to be able to identify topic
> name
>
> trigger(processingTime='30 seconds'). \
> *queryName('md'). *\
>
> Next the controlling topic (called newtopic)  has the following
>
> foreachBatch(*sendToControl*). \
> trigger(processingTime='2 seconds'). \
> queryName('newtopic'). \
>
> That method sendToControl does what is needed
>
> def sendToControl(dfnewtopic, batchId):
> if(len(dfnewtopic.take(1))) > 0:
> #print(f"""newtopic batchId is {batchId}""")
> #dfnewtopic.show(10,False)
> queue = dfnewtopic.select(col("queue")).collect()[0][0]
> status = dfnewtopic.select(col("status")).collect()[0][0]
>
> if((queue == 'md')) & (status == 'false')):
>   spark_session = s.spark_session(config['common']['appName'])
>   active = spark_session.streams.active
>   for e in active:
>  #print(e)
>  name = e.name
>  if(name == 'md'):
> print(f"""Terminating streaming process {name}""")
> e.stop()
> else:
> print("DataFrame newtopic is empty")
>
> This seems to work as I checked it to ensure that in this case data was
> written and saved to the target sink (BigQuery table). It will wait until
> data is written completely meaning the current streaming message is
> processed and there is a latency there.
>
> This is the output
>
> Terminating streaming process md
> wrote to DB  ## this is the flag  I added to ensure the current
> micro-bath was completed
> 2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id
> = 6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
> 2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error
>
> The various termination processes are described in
>
> Structured Streaming Programming Guide - Spark 3.1.1 Documentation
> (apache.org)
> 
>
> This is the idea I came up with which allows ending the streaming process
> with least cost.
>
> Ideas, opinions are welcome
>
>
> Cheers
>
>
>view my Linkedin profile
> 
>
>
>
> 

Accelerating Spark SQL / Dataframe using GPUs & Alluxio

2021-04-23 Thread Bin Fan
Hi Spark users,

We have been working on GPU acceleration for Apache Spark SQL / Dataframe
using the RAPIDS Accelerator for Apache Spark

and open source project Alluxio 
without any code changes.
Our preliminary results suggest 2x improvement in performance and 70% in
ROI compared to a CPU-based cluster.

Feel free to read the developer blog  for more
details of the benchmark. If you are interested to discuss further with the
authors, join our free online meetup
 next Tuesday morning
(April 27) Pacific time.

Best,

- Bin Fan


Re: java.lang.IllegalArgumentException: Unsupported class file major version 55

2021-04-23 Thread Sean Owen
This means you compiled with Java 11, but are running on Java < 11. It's
not related to Spark.

On Fri, Apr 23, 2021 at 10:23 AM chansonzhang 
wrote:

> I just update the spark-* version in my pom.xml to match my spark and scala
> environment, and this solved the problem
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


pyspark sql load with path of special character

2021-04-23 Thread Regin Quinoa
Hi, I am using pyspark sql to load files into table following
```LOAD DATA LOCAL INPATH '/user/hive/warehouse/students' OVERWRITE INTO
TABLE test_load;```
 https://spark.apache.org/docs/latest/sql-ref-syntax-dml-load.html

It complains pyspark.sql.utils.AnalysisException: load data input path does
not exist
 when the path string has timestamp in the directory structure like
XX/XX/2021-03-02T20:04:27+00:00/file.parquet

It works with path without timestamp. How to work it around?


Re: java.lang.IllegalArgumentException: Unsupported class file major version 55

2021-04-23 Thread chansonzhang
I just update the spark-* version in my pom.xml to match my spark and scala
environment, and this solved the problem




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Files

2021-04-23 Thread Mich Talebzadeh
Interesting.

If we go back to classic Lambda architecture on premise, you could Flume
API to Kafka to add files to HDFS in time series bases.

Most higher CDC vendors do exactly that. Oracle GoldenGate (OGG) classic
gets data from Oracle redo logs and sends them to subscribers. One can
deploy OGC for Big Data to enable these files to be read and processed for
Kafka, Hive, HDFS etc.

So let us assume that we create these files and stream them on object
storage in Cloud. Then we can use Spark Structure Streaming (SSS) to act as
ETL tool. Assuming that streaming interval to be 10 minutes, we can still
read them but ensure that we only trigger SSS reads every 4 hours.

 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 foreachBatch(sendToSink). \
 trigger(processingTime='14400 seconds'). \
 queryName('readFiles'). \
 start()

This will ensure that spark only processes them every 4 hours.


HTH

   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 23 Apr 2021 at 15:40, ayan guha  wrote:

> Hi
>
> In one of the spark summit demo, it is been alluded that we should think
> batch jobs in streaming pattern, using "run once" in a schedule.
> I find this idea very interesting and I understand how this can be
> achieved for sources like kafka, kinesis or similar. in fact we have
> implemented this model for cosmos changefeed.
>
> My question is: can this model extend to file based sources? I understand
> it can be for append only file  streams. The use case I have is: A CDC tool
> like aws dms or shareplex or similar writing changes to a stream of files,
> in date based folders. So it just goes on like T1, T2 etc folders. Also,
> lets assume files are written every 10 mins, but I want to process them
> every 4 hours.
> Can I use streaming method so that it can manage checkpoints on its own?
>
> Best - Ayan
> --
> Best Regards,
> Ayan Guha
>


Spark Streaming with Files

2021-04-23 Thread ayan guha
Hi

In one of the spark summit demo, it is been alluded that we should think
batch jobs in streaming pattern, using "run once" in a schedule.
I find this idea very interesting and I understand how this can be achieved
for sources like kafka, kinesis or similar. in fact we have implemented
this model for cosmos changefeed.

My question is: can this model extend to file based sources? I understand
it can be for append only file  streams. The use case I have is: A CDC tool
like aws dms or shareplex or similar writing changes to a stream of files,
in date based folders. So it just goes on like T1, T2 etc folders. Also,
lets assume files are written every 10 mins, but I want to process them
every 4 hours.
Can I use streaming method so that it can manage checkpoints on its own?

Best - Ayan
-- 
Best Regards,
Ayan Guha


Sleep behavior

2021-04-23 Thread Praneeth Shishtla
Hi,
We have a 6 node spark cluster and have some pyspark jobs running on it.
The job is dependent on external application and to have resiliency we try a
couple of times.
Will it be fine to induce some wait time between two runs(using
time.sleep()) ? Or could there by any sync issues?
Wanted to understand the behaviour/issues if any.

Thanks,
Praneeth





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Shutting down spark structured streaming when the streaming process completed current process

2021-04-23 Thread Mich Talebzadeh
Hi,


This is the design that I came up with.


How to shutdown the topic doing work for the message being processed, wait
for it to complete and shutdown the streaming process for a given topic.


I thought about this and looked at options. Using sensors to implement this
like airflow would be expensive as for example reading a file from object
storage or from an underlying database would have incurred additional I/O
overheads through continuous polling.


So the design had to be incorporated into the streaming process itself.
What I came up with was an addition of a control topic which keeps running
triggered every 2 seconds say and is in json format with the following
structure


root

 |-- newtopic_value: struct (nullable = true)

 ||-- uuid: string (nullable = true)

 ||-- timeissued: timestamp (nullable = true)

 ||-- queue: string (nullable = true)

 ||-- status: string (nullable = true)

In above the queue refers to the business topic) and status is set to
'true', meaning BAU. This control topic streaming  can be
restarted anytime, and status can be set to false if we want to stop the
streaming queue for a given business topic

ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe
{"uuid":"ac7d0b2e-dc71-4b3f-a17a-500cd9d38efe",
"timeissued":"2021-04-23T08:54:06", "queue":"md", "status":"true"}

64a8321c-1593-428b-ae65-89e45ddf0640
{"uuid":"64a8321c-1593-428b-ae65-89e45ddf0640",
"timeissued":"2021-04-23T09:49:37", "queue":"md", "status":"false"}

So how can I stop the business queue when the current business topic
message has been processed? Let us say the source is sending data for a
business topic every 30 seconds. Our control topic sends a one liner as
above every 2 seconds.

In your writestream add the following line to be able to identify topic name

trigger(processingTime='30 seconds'). \
*queryName('md'). *\

Next the controlling topic (called newtopic)  has the following

foreachBatch(*sendToControl*). \
trigger(processingTime='2 seconds'). \
queryName('newtopic'). \

That method sendToControl does what is needed

def sendToControl(dfnewtopic, batchId):
if(len(dfnewtopic.take(1))) > 0:
#print(f"""newtopic batchId is {batchId}""")
#dfnewtopic.show(10,False)
queue = dfnewtopic.select(col("queue")).collect()[0][0]
status = dfnewtopic.select(col("status")).collect()[0][0]

if((queue == 'md')) & (status == 'false')):
  spark_session = s.spark_session(config['common']['appName'])
  active = spark_session.streams.active
  for e in active:
 #print(e)
 name = e.name
 if(name == 'md'):
print(f"""Terminating streaming process {name}""")
e.stop()
else:
print("DataFrame newtopic is empty")

This seems to work as I checked it to ensure that in this case data was
written and saved to the target sink (BigQuery table). It will wait until
data is written completely meaning the current streaming message is
processed and there is a latency there.

This is the output

Terminating streaming process md
wrote to DB  ## this is the flag  I added to ensure the current micro-bath
was completed
2021-04-23 09:59:18,029 ERROR streaming.MicroBatchExecution: Query md [id =
6bbccbfe-e770-4fb0-b83d-0dedd0ee571b, runId =
2ae55673-6bc2-4dbe-af60-9fdc0447bff5] terminated with error

The various termination processes are described in

Structured Streaming Programming Guide - Spark 3.1.1 Documentation
(apache.org)


This is the idea I came up with which allows ending the streaming process
with least cost.

Ideas, opinions are welcome


Cheers


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.