Re: A simple example that demonstrates that a Spark distributed cluster is faster than Spark Local Standalone

2020-09-24 Thread Sean Owen
If you have the same amount of resource (cores, memory, etc) on one
machine, that is pretty much always going to be faster than using
those same resources split across several machines.
Even if you have somewhat more resource available on a cluster, the
distributed version could be slower if you, for example, are
bottlenecking on network I/O and leaving some resources underutilized.

Distributing isn't really going to make the workload consume less
resource; on the contrary it makes it take more. However it might make
the total wall-clock completion time go way down through parallelism.
How much you benefit from parallelism really depends on the problem,
the cluster, the input, etc. You may not see a speedup in this problem
until you hit more scale or modify the job to distribute a little
better, etc.

On Thu, Sep 24, 2020 at 1:43 PM javaguy Java  wrote:
>
> Hi,
>
> I made a post on stackoverflow that I can't seem to make any headway on
> https://stackoverflow.com/questions/63834379/spark-performance-local-faster-than-cluster
>
> Before someone starts making suggestions on changing the code; note that the 
> code and example on the above post is from a Udemy course and is not my code. 
> I am looking to take this dataset and code and executing the same on a 
> cluster I am looking to see the value of Spark by seeing results so that the 
> job submitted to the Spark Cluster runs in a faster time compared to 
> Standalone.
>
> I am currently evaluating Spark and I've thus far spent about a month of 
> weekends of my free time trying to get a Spark Cluster to show me improved 
> performance in comparison to Spark Standalone but I am not having success, 
> and after spending so much time in this, I am now looking for help from as 
> I'm time constrained (in general I'm time constrained, not for a project or 
> deadline re: Spark).
>
> If anyone can comment on what I need to make my example work faster on a 
> spark cluster vs standalone I'd appreciate it.
>
> Alternatively if someone can point me to a simple code example + dataset that 
> works better and illustrates the power of distributed spark I'd be happy to 
> use that instead - I'm not wedded to this example that I got from the course 
> - I'm just looking for the simple 5 min to 30 min example quick start that 
> shows the power of Spark distributed clusters.
>
> There's a higher level question here and one that is not obvious to find an 
> answer for.  There are many examples on Spark out there, but there is not a 
> simple large dataset + code example that illustrates the performance gain of 
> Spark's cluster and distributed computing benefits vs just a single local 
> standalone; which is what someone in my position is looking for (someone who 
> makes architectural and platform decisions and is bandwidth / time 
> constrained and wants to see the power and advantages of Spark cluster and 
> distributed computing without spending weeks on the problem).
>
> I'm also willing to open this up to a consulting engagement if anyone is 
> interested as I'd expect it to be quick (either you have a simple example 
> that just needs to be setup etc or its easy for you to demonstrate cluster 
> performance > standalone for this dataset)
>
> Thx
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



A simple example that demonstrates that a Spark distributed cluster is faster than Spark Local Standalone

2020-09-24 Thread javaguy Java
Hi,

I made a post on stackoverflow that I can't seem to make any headway on
https://stackoverflow.com/questions/63834379/spark-performance-local-faster-than-cluster

Before someone starts making suggestions on changing the code; note that
the code and example on the above post is from a Udemy course and is not my
code. I am looking to take this dataset and code and executing the same on
a cluster I am looking to see the value of Spark by seeing results so that
the job submitted to the Spark Cluster runs in a faster time compared to
Standalone.

I am currently evaluating Spark and I've thus far spent about a month of
weekends of my free time trying to get a Spark Cluster to show me improved
performance in comparison to Spark Standalone but I am not having success,
and after spending so much time in this, I am now looking for help from as
I'm time constrained (in general I'm time constrained, not for a project or
deadline re: Spark).

If anyone can comment on what I need to make my example work faster on a
spark cluster vs standalone I'd appreciate it.

Alternatively if someone can point me to a simple code example + dataset
that works better and illustrates the power of distributed spark I'd be
happy to use that instead - I'm not wedded to this example that I got from
the course - I'm just looking for the simple 5 min to 30 min example quick
start that shows the power of Spark distributed clusters.

There's a higher level question here and one that is not obvious to find an
answer for.  There are many examples on Spark out there, but there is not a
simple large dataset + code example that illustrates the performance gain
of Spark's cluster and distributed computing benefits vs just a single
local standalone; which is what someone in my position is looking for
(someone who makes architectural and platform decisions and is bandwidth /
time constrained and wants to see the power and advantages of Spark cluster
and distributed computing without spending weeks on the problem).

I'm also willing to open this up to a consulting engagement if anyone is
interested as I'd expect it to be quick (either you have a simple example
that just needs to be setup etc or its easy for you to demonstrate cluster
performance > standalone for this dataset)

Thx


Re: Is RDD.persist honoured if multiple actions are executed in parallel

2020-09-24 Thread Michael Mior
If you want to ensure the persisted RDD has been calculated first,
just run foreach with a dummy function first to force evaluation.

--
Michael Mior
michael.m...@gmail.com

Le jeu. 24 sept. 2020 à 00:38, Arya Ketan  a écrit :
>
> Thanks, we were able to validate the same behaviour.
>
> On Wed, 23 Sep 2020 at 18:05, Sean Owen  wrote:
>>
>> It is but it happens asynchronously. If you access the same block twice 
>> quickly, the cached block may not yet be available the second time yet.
>>
>> On Wed, Sep 23, 2020, 7:17 AM Arya Ketan  wrote:
>>>
>>> Hi,
>>> I have a spark streaming use-case ( spark 2.2.1 ). And in my spark job, I 
>>> have multiple actions. I am running them in parallel by executing the 
>>> actions in separate threads. I have  a rdd.persist after which the DAG 
>>> forks into multiple actions.
>>> but I see that rdd caching is not happening  and the entire DAG is executed 
>>> twice ( once in each action) .
>>>
>>> What am I missing?
>>> Arya
>>>
>>>
>>
>>
> --
> Arya

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark 3 Debug] Date values reset to Unix epoch

2020-09-24 Thread EveLiao
I can't see your code and return values. Can you post them again?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Pyspark 3 Debug] Date values reset to Unix epoch

2020-09-24 Thread Andrew Mullins
My apologies, my code sections were eaten.

Code:
import datetime as dt
import pyspark

def get_spark():
return
pyspark.sql.SparkSession.builder.enableHiveSupport().getOrCreate()

if __name__ == '__main__':
spark = get_spark()
table = spark.createDataFrame(
[("1234", dt.date(2020, 5, 25))],
["id", "date"]
)
table.coalesce(1).createOrReplaceTempView("test_date")
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
spark.sql(
"CREATE TABLE IF NOT EXISTS "
"test_db.test_date "
"SELECT * FROM test_date"
)

print("Temp Table:")
print(spark.table("test_date").collect())
print("Final Table:")
print(spark.table("test_db.test_date").collect())


Output:
Temp Table:
[Row(id='1234', date=datetime.date(2020, 5, 25))]
Final Table:
[Row(id='1234', date=datetime.date(1970, 1, 1))]

Best,
Andrew



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Let multiple jobs share one rdd?

2020-09-24 Thread Khalid Mammadov

Perhaps you can use Global Temp Views?

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.createGlobalTempView


On 24/09/2020 14:52, Gang Li wrote:

Hi all,

There are three jobs, among which the first rdd is the same. Can the first
rdd be calculated once, and then the subsequent operations will be
calculated in parallel?



My code is as follows:

sqls = ["""
 INSERT OVERWRITE TABLE `spark_input_test3` PARTITION
(dt='20200917')
 SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey
 FROM temp_table where status=3""",
 """
 INSERT OVERWRITE TABLE `spark_input_test4` PARTITION
(dt='20200917')
 SELECT id, cur_inst_id, status, update_time, schedule_time,
task_name
 FROM temp_table where schedule_time > '2020-09-01 00:00:00'
 """]

def multi_thread():
 sql = """SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey, task_name, update_time, schedule_time, cur_inst_id,
scheduler_id
 FROM table
 where dt < '20200801'"""
 data = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
 threads = []
 for i in range(2):
 try:
 t = threading.Thread(target=insert_overwrite_thread,
args=(sqls[i], data, ))
 t.start()
 threads.append(t)
 except Exception as x:
 print x
 for t in threads:
 t.join()

def insert_overwrite_thread(sql, data):
 data.createOrReplaceTempView('temp_table')
 spark.sql(sql)



Since spark is in lazy mode, the first RDD will still be calculated multiple
times during parallel submission.
I would like to ask you if there are other ways, thanks!

Cheers,
Gang Li



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Pyspark 3 Debug] Date values reset to Unix epoch

2020-09-24 Thread Andrew Mullins
I am encountering a bug with a broken unit test - it passes on Pyspark 2.4.4
but fails on Pyspark 3.0. I've managed to create a minimal reproducible
example of the issue.

The following code:


Returns the following on Pyspark 3:


On Pyspark 2.4.4, the final table has the correct date value.

Does anyone have any ideas what might be causing this?

Best,
Andrew Mullins



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distribute entire columns to executors

2020-09-24 Thread Jeff Evans
I think you can just select the columns you need into new DataFrames, then
process those separately.

val dfFirstTwo = ds.select("Col1", "Col2")
# do whatever with this one
dfFirstTwo.sort(...)
# similar for the next two columns
val dfNextTwo = ds.select("Col3", "Col4")
dfNextTwo.sort(...)

These should result in separate tasks, which you could confirm by checking
the Spark UI when the application is submitted.

On Thu, Sep 24, 2020 at 7:01 AM Pedro Cardoso 
wrote:

> Hello,
>
> Is it possible in Spark to map partitions such that partitions are
> column-based and not row-based?
> My use-case is to compute temporal series of numerical values.
> I.e: Exponential moving averages over the values of a given dataset's
> column.
>
> Suppose there is a dataset with roughly 200 columns, a high percentage of
> which are numerical (> 60%) and at least one timestamp column, as shown in
> the attached file.
>
> I want to shuffle data to executors such that each executor has a smaller
> dataset with only 2 columns, [Col0: Timestamp, Col: Numerical type].
> Over which I can then sort the dataset by increasing timestamp and then
> iterate over the rows with a custom function which receives a tuple:
> {timestamp; value}.
>
> Partitoning by column value does not make sense for me since there is a
> temporal lineage of values which I must keep. On the other hand I would
> like to parallelize this workload as my datasets can be quite big (> 2
> billion rows). The only way I see how is to distribute the entire columns
> so that each executor has 2B timestamp + numerical values rather than
> 2B*size of an entire row.
>
> Is this possible in Spark? Can someone point in the right direction? A
> code snippet example (not working is fine if the logic is sound) would be
> highly appreciated!
>
> Thank you for your time.
> --
>
> *Pedro Cardoso*
>
> *Research Engineer*
>
> pedro.card...@feedzai.com
>
>
> [image: Follow Feedzai on Facebook.] 
> [image:
> Follow Feedzai on Twitter!] [image: Connect
> with Feedzai on LinkedIn!] 
>
>
> [image: Feedzai best in class aite report]
> 
>
> *The content of this email is confidential and intended for the recipient
> specified in message only. It is strictly prohibited to share any part of
> this message with any third party, without a written consent of the sender.
> If you received this message by mistake, please reply to this message and
> follow with its deletion, so that we can ensure such a mistake does not
> occur in the future.*
>
> *The content of this email is confidential and intended for the recipient
> specified in message only. It is strictly prohibited to share any part of
> this message with any third party, without a written consent of the sender.
> If you received this message by mistake, please reply to this message and
> follow with its deletion, so that we can ensure such a mistake does not
> occur in the future.*
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Let multiple jobs share one rdd?

2020-09-24 Thread Gang Li
Hi all,

There are three jobs, among which the first rdd is the same. Can the first
rdd be calculated once, and then the subsequent operations will be
calculated in parallel?


 

My code is as follows:

sqls = ["""
INSERT OVERWRITE TABLE `spark_input_test3` PARTITION
(dt='20200917')
SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey
FROM temp_table where status=3""",
""" 
INSERT OVERWRITE TABLE `spark_input_test4` PARTITION
(dt='20200917')
SELECT id, cur_inst_id, status, update_time, schedule_time,
task_name
FROM temp_table where schedule_time > '2020-09-01 00:00:00' 
"""]

def multi_thread():
sql = """SELECT id, plan_id, status, retry_times, start_time,
schedule_datekey, task_name, update_time, schedule_time, cur_inst_id,
scheduler_id
FROM table
where dt < '20200801'"""
data = spark.sql(sql).persist(StorageLevel.MEMORY_AND_DISK)
threads = []
for i in range(2):
try:
t = threading.Thread(target=insert_overwrite_thread,
args=(sqls[i], data, ))
t.start()
threads.append(t)
except Exception as x:
print x
for t in threads:
t.join()

def insert_overwrite_thread(sql, data):
data.createOrReplaceTempView('temp_table')
spark.sql(sql)



Since spark is in lazy mode, the first RDD will still be calculated multiple
times during parallel submission.
I would like to ask you if there are other ways, thanks!

Cheers,
Gang Li



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Distribute entire columns to executors

2020-09-24 Thread Lalwani, Jayesh
You could covert columns to rows. Some thing like this

val cols = [“A”, “B”, “C”]
df.flatMap( row => {
 cols.map(c => (row.getAsTimeStamp(“timestamp”), row.getAsInt(c), c) )
}).toDF(“timestamp”, “value”, “colName”)

If you are using dataframes, all of your columns are of the same type. If they 
aren’t, you will need to add logic to convert them to the same type, or use 
Dataset of tuples

From: Pedro Cardoso 
Date: Thursday, September 24, 2020 at 8:02 AM
To: "user@spark.apache.org" 
Subject: [EXTERNAL] Distribute entire columns to executors


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hello,

Is it possible in Spark to map partitions such that partitions are column-based 
and not row-based?
My use-case is to compute temporal series of numerical values.
I.e: Exponential moving averages over the values of a given dataset's column.

Suppose there is a dataset with roughly 200 columns, a high percentage of which 
are numerical (> 60%) and at least one timestamp column, as shown in the 
attached file.

I want to shuffle data to executors such that each executor has a smaller 
dataset with only 2 columns, [Col0: Timestamp, Col: Numerical type]. Over 
which I can then sort the dataset by increasing timestamp and then iterate over 
the rows with a custom function which receives a tuple: {timestamp; value}.

Partitoning by column value does not make sense for me since there is a 
temporal lineage of values which I must keep. On the other hand I would like to 
parallelize this workload as my datasets can be quite big (> 2 billion rows). 
The only way I see how is to distribute the entire columns so that each 
executor has 2B timestamp + numerical values rather than 2B*size of an entire 
row.

Is this possible in Spark? Can someone point in the right direction? A code 
snippet example (not working is fine if the logic is sound) would be highly 
appreciated!

Thank you for your time.
--

Pedro Cardoso

Research Engineer



pedro.card...@feedzai.com



[Image removed by sender. Follow Feedzai on 
Facebook.][Image removed by sender. Follow 
Feedzai on Twitter!][Image removed by sender. 
Connect with Feedzai on LinkedIn!]   
 [Image removed by sender.]

[Image removed by sender. Feedzai best in class aite 
report]

The content of this email is confidential and intended for the recipient 
specified in message only. It is strictly prohibited to share any part of this 
message with any third party, without a written consent of the sender. If you 
received this message by mistake, please reply to this message and follow with 
its deletion, so that we can ensure such a mistake does not occur in the future.

The content of this email is confidential and intended for the recipient 
specified in message only. It is strictly prohibited to share any part of this 
message with any third party, without a written consent of the sender. If you 
received this message by mistake, please reply to this message and follow with 
its deletion, so that we can ensure such a mistake does not occur in the future.


Re: Edge AI with Spark

2020-09-24 Thread Deepak Sharma
Near edge would work in this case.
On Edge doesn't makes much sense , specially if its distributed processing
framework such as spark.

On Thu, Sep 24, 2020 at 3:12 PM Gourav Sengupta 
wrote:

> hi,
>
> its better to use lighter frameworks over edge. Some of the edge devices I
> work on run at over 40 to 50 degree celsius, therefore using lighter
> frameworks will be useful for the health of the device.
>
> Regards,
> Gourav
>
> On Thu, Sep 24, 2020 at 8:42 AM ayan guha  wrote:
>
>> Too broad a question  and the short answer is yes and long answer is it
>> depends.
>>
>> Essentially spark is a compute engine so it can be wrapped into any
>> containerized model and deployed at the edge. I believe there are various
>> implemntation available
>>
>>
>>
>> On Thu, 24 Sep 2020 at 5:19 pm, Marco Sassarini <
>> marco.sassar...@overit.it> wrote:
>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>> I'd like to know if Spark supports edge AI: can Spark
>>>
>>> run on physical device such as mobile devices running Android/iOS?
>>>
>>>
>>>
>>>
>>>
>>> Best regards,
>>>
>>>
>>> Marco Sassarini
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *Marco SassariniArtificial Intelligence Department*
>>>
>>>
>>>
>>>
>>>
>>>
>>> office: +39 0434 562 978
>>>
>>>
>>>
>>> www.overit.it
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>> Best Regards,
>> Ayan Guha
>>
>

-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Distribute entire columns to executors

2020-09-24 Thread Pedro Cardoso
Hello,

Is it possible in Spark to map partitions such that partitions are
column-based and not row-based?
My use-case is to compute temporal series of numerical values.
I.e: Exponential moving averages over the values of a given dataset's
column.

Suppose there is a dataset with roughly 200 columns, a high percentage of
which are numerical (> 60%) and at least one timestamp column, as shown in
the attached file.

I want to shuffle data to executors such that each executor has a smaller
dataset with only 2 columns, [Col0: Timestamp, Col: Numerical type].
Over which I can then sort the dataset by increasing timestamp and then
iterate over the rows with a custom function which receives a tuple:
{timestamp; value}.

Partitoning by column value does not make sense for me since there is a
temporal lineage of values which I must keep. On the other hand I would
like to parallelize this workload as my datasets can be quite big (> 2
billion rows). The only way I see how is to distribute the entire columns
so that each executor has 2B timestamp + numerical values rather than
2B*size of an entire row.

Is this possible in Spark? Can someone point in the right direction? A code
snippet example (not working is fine if the logic is sound) would be highly
appreciated!

Thank you for your time.
--

*Pedro Cardoso*

*Research Engineer*

pedro.card...@feedzai.com


[image: Follow Feedzai on Facebook.] [image:
Follow Feedzai on Twitter!] [image: Connect
with Feedzai on LinkedIn!] 


[image: Feedzai best in class aite report]


*The content of this email is confidential and intended for the recipient
specified in message only. It is strictly prohibited to share any part of
this message with any third party, without a written consent of the sender.
If you received this message by mistake, please reply to this message and
follow with its deletion, so that we can ensure such a mistake does not
occur in the future.*

-- 
The content of this email is confidential and 
intended for the recipient 
specified in message only. It is strictly 
prohibited to share any part of 
this message with any third party, 
without a written consent of the 
sender. If you received this message by
 mistake, please reply to this 
message and follow with its deletion, so 
that we can ensure such a mistake 
does not occur in the future.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Edge AI with Spark

2020-09-24 Thread Gourav Sengupta
hi,

its better to use lighter frameworks over edge. Some of the edge devices I
work on run at over 40 to 50 degree celsius, therefore using lighter
frameworks will be useful for the health of the device.

Regards,
Gourav

On Thu, Sep 24, 2020 at 8:42 AM ayan guha  wrote:

> Too broad a question  and the short answer is yes and long answer is it
> depends.
>
> Essentially spark is a compute engine so it can be wrapped into any
> containerized model and deployed at the edge. I believe there are various
> implemntation available
>
>
>
> On Thu, 24 Sep 2020 at 5:19 pm, Marco Sassarini 
> wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Hi,
>>
>>
>> I'd like to know if Spark supports edge AI: can Spark
>>
>> run on physical device such as mobile devices running Android/iOS?
>>
>>
>>
>>
>>
>> Best regards,
>>
>>
>> Marco Sassarini
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Marco SassariniArtificial Intelligence Department*
>>
>>
>>
>>
>>
>>
>> office: +39 0434 562 978
>>
>>
>>
>> www.overit.it
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Edge AI with Spark

2020-09-24 Thread ayan guha
Too broad a question  and the short answer is yes and long answer is it
depends.

Essentially spark is a compute engine so it can be wrapped into any
containerized model and deployed at the edge. I believe there are various
implemntation available



On Thu, 24 Sep 2020 at 5:19 pm, Marco Sassarini 
wrote:

>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Hi,
>
>
> I'd like to know if Spark supports edge AI: can Spark
>
> run on physical device such as mobile devices running Android/iOS?
>
>
>
>
>
> Best regards,
>
>
> Marco Sassarini
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Marco SassariniArtificial Intelligence Department*
>
>
>
>
>
>
> office: +39 0434 562 978
>
>
>
> www.overit.it
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
Best Regards,
Ayan Guha


Edge AI with Spark

2020-09-24 Thread Marco Sassarini
Hi,
I'd like to know if Spark supports edge AI: can Spark run on physical device 
such as mobile devices running Android/iOS?

Best regards,
Marco Sassarini


[cid:b995380c-a2a9-47fd-a865-edcad29e4206]

Marco Sassarini
Artificial Intelligence Department

office: +39 0434 562 978

www.overit.it