Re: Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Tathagata Das
Have you looked at the suggestion made by the error by searching for
"Structured Streaming + Kafka Integration Guide" in Google? It should be
the first result.
The last section

in the "Structured Streaming + Kafka Integration Guide" specifies how to
add the spark-sql-kafka maven dependency when starting pyspark.


On Sun, Apr 5, 2020 at 11:11 AM Sumit Agrawal  wrote:

> Hello,
>
> I am using Spark 2.4.5, Kafka 2.3.1 on my local machine.
>
> I am able to produce and consume messages on Kafka with bootstrap server
> config "localhost:9092”
>
> While trying to setup reader with spark streaming API, I am getting an
> error as
>
> Exception Message:
> Py4JJavaError: An error occurred while calling o166.load.
> : org.apache.spark.sql.AnalysisException: Failed to find data source:
> kafka. Please deploy the application as per the deployment section of
> "Structured Streaming + Kafka Integration Guide".;
>
> Spark Code I am trying to execute:
> df1 = spark.readStream.format("kafka")\
>   .option("kafka.bootstrap.servers", "localhost:9092")\
>   .option("subscribe", "topic1")\
>   .load()
>
> Any guidelines would help.
>
> Regards,
> Sumit
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark-submit exit status on k8s

2020-04-05 Thread Yinan Li
Not sure if you are aware of this new feature in Airflow
https://issues.apache.org/jira/browse/AIRFLOW-6542. It's a way to use
Airflow to orchestrate spark applications run using the Spark K8S operator (
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator).


On Sun, Apr 5, 2020 at 8:25 AM Masood Krohy 
wrote:

> Another, simpler solution that I just thought of: just add an operation at
> the end of your Spark program to write an empty file somewhere, with
> filename SUCCESS for example. Add a stage to your AirFlow graph to check
> the existence of this file after running spark-submit. If the file is
> absent, then the Spark app must have failed.
>
> The above should work if you want to avoid dealing with the REST API for
> monitoring.
>
> Masood
>
> __
>
> Masood Krohy, Ph.D.
> Data Science Advisor|Platform Architecthttps://www.analytical.works
>
> On 4/4/20 10:54 AM, Masood Krohy wrote:
>
> I'm not in the Spark dev team, so cannot tell you why that priority was
> chosen for the JIRA issue or if anyone is about to finish the work on that;
> I'll let others jump in if they know.
>
> Just wanted to offer a potential solution so that you can move ahead in
> the meantime.
>
> Masood
>
> __
>
> Masood Krohy, Ph.D.
> Data Science Advisor|Platform Architecthttps://www.analytical.works
>
> On 4/4/20 7:49 AM, Marshall Markham wrote:
>
> Thank you very much Masood for your fast response. Last question, is the
> current status in Jira representative of the status of the ticket within
> the project team? This seems like a big deal for the K8s implementation and
> we were surprised to find it marked as priority low. Is there any
> discussion of picking up this work in the near future?
>
>
>
> Thanks,
>
> Marshall
>
>
>
> *From:* Masood Krohy 
> 
> *Sent:* Friday, April 3, 2020 9:34 PM
> *To:* Marshall Markham 
> ; user 
> 
> *Subject:* Re: spark-submit exit status on k8s
>
>
>
> While you wait for a fix on that JIRA ticket, you may be able to add an
> intermediary step in your AirFlow graph, calling Spark's REST API after
> submitting the job, and dig into the actual status of the application, and
> make a success/fail decision accordingly. You can make repeated calls in a
> loop to the REST API with few seconds delay between each call while the
> execution is in progress until the application fails or succeeds.
>
> https://spark.apache.org/docs/latest/monitoring.html#rest-api
> 
>
> Hope this helps.
>
> Masood
>
> __
>
>
>
> Masood Krohy, Ph.D.
>
> Data Science Advisor|Platform Architect
>
> https://www.analytical.works 
> 
>
> On 4/3/20 8:23 AM, Marshall Markham wrote:
>
> Hi Team,
>
>
>
> My team recently conducted a POC of Kubernetes/Airflow/Spark with great
> success. The major concern we have about this system, after the completion
> of our POC is a behavior of spark-submit. When called with a Kubernetes API
> endpoint as master spark-submit seems to always return exit status 0. This
> is obviously a major issue preventing us from conditioning job graphs on
> the success or failure of our Spark jobs. I found Jira ticket SPARK-27697
> under the Apache issues covering this bug. The ticket is listed as minor
> and does not seem to have any activity recently. I would like to up vote it
> and ask if there is anything I can do to move this forward. This could be
> the one thing standing between my team and our preferred batch workload
> implementation. Thank you.
>
>
>
> *Marshall Markham*
>
> Data Engineer
>
> PrecisionLender, a Q2 Company
>
>
>
> NOTE: This communication and any attachments are for the sole use of the
> intended recipient(s) and may contain confidential and/or privileged
> information. Any unauthorized review, use, disclosure or distribution is
> prohibited. If you are not the intended recipient, please contact the
> sender by replying to this email, and destroy all copies of the original
> message.
>
> NOTE: This communication and any attachments are for the sole use of the
> intended recipient(s) and may contain confidential and/or privileged
> information. Any unauthorized review, use, disclosure or distribution is
> prohibited. If you are not the intended recipient, please contact the
> sender by replying to this email, and destroy all copies of the original
> message.
>
>


Re: pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Thanks Silvio. I need grouped map pandas UDF which takes a spark data frame as 
the input and outputs a spark data frame having a different shape from input. 
Grouped map is kind of unique to pandas udf and I have trouble to find a 
similar non pandas udf for an apple to apple comparison. Let me know if you 
have better idea for investigating grouped map pandas udf slowness.

One potential work around could be grouping the 250M records by id. For each 
group, do groupby(‘id’).apply(pd_udf). Not sure which way is more promising 
compared with repartition + mapPartition, reduceByKey, combineByKey.

Appreciate any clue.

Sent from my iPhone

> On Apr 5, 2020, at 6:18 AM, Silvio Fiorito  
> wrote:
> 
> 
> Your 2 examples are doing different things.
>  
> The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
> aggregate.
>  
> I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
> result the same?
>  
> From: Lian Jiang 
> Date: Sunday, April 5, 2020 at 3:28 AM
> To: user 
> Subject: pandas_udf is very slow
>  
> Hi,
>  
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
> over non pandas udf per 
> https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
>  
> My data has about 250M records and the pandas udf code is like:
>  
> def pd_udf_func(data):
> return pd.DataFrame(["id"])
> 
> pd_udf = pandas_udf(
> pd_udf_func,
> returnType=("id int"),
> functionType=PandasUDFType.GROUPED_MAP
> )
> df3 = df.groupBy("id").apply(pd_udf)
> df3.explain()
> """
> == Physical Plan ==
> FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
> +- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(id#9L, 200)
>   +- *(1) Project [id#9L, id#9L, txt#10]
>  +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> As you can see, this pandas udf does nothing but returning a row having a 
> pandas dataframe having None values. In reality, this pandas udf has 
> complicated logic (e.g. iterate through the pandas dataframe rows and do some 
> calculation). This simplification is to reduce noise caused by application 
> specific logic. This pandas udf takes hours to run using 10 executors (14 
> cores and 64G mem each). On the other hand, below non-pandas udf can finish 
> in minutes:
>  
> def udf_func(data_list):
> return "hello"
> 
> udf = udf(udf_func, StringType())
> df2 = 
> df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
>  udf('txt1'))
> df2.explain()
> """
> == Physical Plan ==
> *(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
> +- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
>+- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 
> 0)])
>   +- Exchange hashpartitioning(id#9L, 200)
>  +- ObjectHashAggregate(keys=[id#9L], 
> functions=[partial_collect_list(txt#10, 0, 0)])
> +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> The physical plans show pandas udf uses sortAggregate (slower) while 
> non-pandas udf uses objectHashAggregate (faster).
>  
> Below is what I have tried to improve the performance of pandas udf but none 
> of them worked:
> 1. repartition before groupby. For example, df.repartition(140, 
> "id").groupBy("id").apply(pd_udf). 140 is the same as 
> spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
> but according to the execution plan the repartition seems to be ignored since 
> groupby will do partitioning itself.
> 
> 
> 2. although this slowness is more likely caused by pandas udf instead of 
> groupby, I still played with shuffle settings such as 
> spark.shuffle.compress=True, spark.shuffle.spill.compress=True.
> 
> 
> 3. I played with serDe settings such as 
> spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
> pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
> spark.sql.execution.arrow.maxRecordsPerBatch=10
> 
> 
> 4. I tried to replace the solution of "groupby + pandas udf " with 
> combineByKey, reduceByKey, repartition + mapPartition. But it is not easy 
> since the pandas udf has complicated logic.
> 
>  
> My questions:
>  
> 1. why pandas udf is so slow?
> 2. is there a way to improve the performance of pandas_udf?
> 3. in case it is a known issue of pandas udf, what other remedy I can use? I 
> guess I need to think harder on combineByKey, reduceByKey, repartition + 
> mapPartition. But want to know if I missed anything obvious.
>  
> Any clue is highly appreciated.
>  
> Thanks
> Leon
>  
>  
>  
>  


Re: spark-submit exit status on k8s

2020-04-05 Thread Masood Krohy
Another, simpler solution that I just thought of: just add an operation 
at the end of your Spark program to write an empty file somewhere, with 
filename SUCCESS for example. Add a stage to your AirFlow graph to check 
the existence of this file after running spark-submit. If the file is 
absent, then the Spark app must have failed.


The above should work if you want to avoid dealing with the REST API for 
monitoring.


Masood

__

Masood Krohy, Ph.D.
Data Science Advisor|Platform Architect
https://www.analytical.works

On 4/4/20 10:54 AM, Masood Krohy wrote:


I'm not in the Spark dev team, so cannot tell you why that priority 
was chosen for the JIRA issue or if anyone is about to finish the work 
on that; I'll let others jump in if they know.


Just wanted to offer a potential solution so that you can move ahead 
in the meantime.


Masood

__

Masood Krohy, Ph.D.
Data Science Advisor|Platform Architect
https://www.analytical.works
On 4/4/20 7:49 AM, Marshall Markham wrote:


Thank you very much Masood for your fast response. Last question, is 
the current status in Jira representative of the status of the ticket 
within the project team? This seems like a big deal for the K8s 
implementation and we were surprised to find it marked as priority 
low. Is there any discussion of picking up this work in the near future?


Thanks,

Marshall

*From:*Masood Krohy 
*Sent:* Friday, April 3, 2020 9:34 PM
*To:* Marshall Markham ; user 


*Subject:* Re: spark-submit exit status on k8s

While you wait for a fix on that JIRA ticket, you may be able to add 
an intermediary step in your AirFlow graph, calling Spark's REST API 
after submitting the job, and dig into the actual status of the 
application, and make a success/fail decision accordingly. You can 
make repeated calls in a loop to the REST API with few seconds delay 
between each call while the execution is in progress until the 
application fails or succeeds.


https://spark.apache.org/docs/latest/monitoring.html#rest-api 



Hope this helps.

Masood

__
Masood Krohy, Ph.D.
Data Science Advisor|Platform Architect
https://www.analytical.works  


On 4/3/20 8:23 AM, Marshall Markham wrote:

Hi Team,

My team recently conducted a POC of Kubernetes/Airflow/Spark with
great success. The major concern we have about this system, after
the completion of our POC is a behavior of spark-submit. When
called with a Kubernetes API endpoint as master spark-submit
seems to always return exit status 0. This is obviously a major
issue preventing us from conditioning job graphs on the success
or failure of our Spark jobs. I found Jira ticket SPARK-27697
under the Apache issues covering this bug. The ticket is listed
as minor and does not seem to have any activity recently. I would
like to up vote it and ask if there is anything I can do to move
this forward. This could be the one thing standing between my
team and our preferred batch workload implementation. Thank you.

*Marshall Markham*

Data Engineer

PrecisionLender, a Q2 Company

NOTE: This communication and any attachments are for the sole use
of the intended recipient(s) and may contain confidential and/or
privileged information. Any unauthorized review, use, disclosure
or distribution is prohibited. If you are not the intended
recipient, please contact the sender by replying to this email,
and destroy all copies of the original message.

NOTE: This communication and any attachments are for the sole use of 
the intended recipient(s) and may contain confidential and/or 
privileged information. Any unauthorized review, use, disclosure or 
distribution is prohibited. If you are not the intended recipient, 
please contact the sender by replying to this email, and destroy all 
copies of the original message. 


Spark, read from Kafka stream failing AnalysisException

2020-04-05 Thread Sumit Agrawal
Hello,

I am using Spark 2.4.5, Kafka 2.3.1 on my local machine.

I am able to produce and consume messages on Kafka with bootstrap server config 
"localhost:9092”

While trying to setup reader with spark streaming API, I am getting an error as 

Exception Message:
Py4JJavaError: An error occurred while calling o166.load.
: org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. 
Please deploy the application as per the deployment section of "Structured 
Streaming + Kafka Integration Guide".;

Spark Code I am trying to execute: 
df1 = spark.readStream.format("kafka")\
  .option("kafka.bootstrap.servers", "localhost:9092")\
  .option("subscribe", "topic1")\
  .load()

Any guidelines would help.

Regards,
Sumit



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pandas_udf is very slow

2020-04-05 Thread Silvio Fiorito
Your 2 examples are doing different things.

The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
aggregate.

I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
result the same?

From: Lian Jiang 
Date: Sunday, April 5, 2020 at 3:28 AM
To: user 
Subject: pandas_udf is very slow

Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
over non pandas udf per 
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.

My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a 
pandas dataframe having None values. In reality, this pandas udf has 
complicated logic (e.g. iterate through the pandas dataframe rows and do some 
calculation). This simplification is to reduce noise caused by application 
specific logic. This pandas udf takes hours to run using 10 executors (14 cores 
and 64G mem each). On the other hand, below non-pandas udf can finish in 
minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 = 
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd', 
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L], 
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while non-pandas 
udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but none of 
them worked:
1. repartition before groupby. For example, df.repartition(140, 
"id").groupBy("id").apply(pd_udf). 140 is the same as 
spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
but according to the execution plan the repartition seems to be ignored since 
groupby will do partitioning itself.


2. although this slowness is more likely caused by pandas udf instead of 
groupby, I still played with shuffle settings such as 
spark.shuffle.compress=True, spark.shuffle.spill.compress=True.


3. I played with serDe settings such as 
spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
spark.sql.execution.arrow.maxRecordsPerBatch=10


4. I tried to replace the solution of "groupby + pandas udf " with 
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy since 
the pandas udf has complicated logic.


My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use? I 
guess I need to think harder on combineByKey, reduceByKey, repartition + 
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon






pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored
over non pandas udf per
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.


My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a
pandas dataframe having None values. In reality, this pandas udf has
complicated logic (e.g. iterate through the pandas dataframe rows and do
some calculation). This simplification is to reduce noise caused by
application specific logic. This pandas udf takes hours to run using 10
executors (14 cores and 64G mem each). On the other hand, below non-pandas
udf can finish in minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 =
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0,
0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L],
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while
non-pandas udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but
none of them worked:
1. repartition before groupby. For example, df.repartition(140,
"id").groupBy("id").apply(pd_udf). 140 is the same as
spark.sql.shuffle.partitions.
I hope groupby can benefit from the repartition but according to the
execution plan the repartition seems to be ignored since groupby will do
partitioning itself.

2. although this slowness is more likely caused by pandas udf instead of
groupby, I still played with shuffle settings such as
spark.shuffle.compress=True,
spark.shuffle.spill.compress=True.

3. I played with serDe settings such as
spark.serializer=org.apache.spark.serializer.KryoSerializer.
Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True
and spark.sql.execution.arrow.maxRecordsPerBatch=10

4. I tried to replace the solution of "groupby + pandas udf " with
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy
since the pandas udf has complicated logic.

My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use?
I guess I need to think harder on combineByKey, reduceByKey, repartition +
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon


Re: Serialization or internal functions?

2020-04-05 Thread Som Lima
If you want to  measure optimisation in terms of time taken , then here is
an idea  :)


public class MyClass {
public static void main(String args[])
throws InterruptedException
{
  long start  =  System.currentTimeMillis();

// replace with your add column code
// enough data to measure
   Thread.sleep(5000);

 long end  = System.currentTimeMillis();

   int timeTaken = 0;
  timeTaken = (int) (end  - start );

  System.out.println("Time taken  " + timeTaken) ;
}
}

On Sat, 4 Apr 2020, 19:07 ,  wrote:

> Dear Community,
>
>
>
> Recently, I had to solve the following problem “for every entry of a
> Dataset[String], concat a constant value” , and to solve it, I used
> built-in functions :
>
>
>
> val data = Seq("A","b","c").toDS
>
>
>
> scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit("
> "),lit("concat"))).select("valueconcat").explain()
>
> == Physical Plan ==
>
> LocalTableScan [valueconcat#161]
>
>
>
> As an alternative , a much simpler version of the program is to use map,
> but it adds a serialization step that does not seem to be present for the
> version above :
>
>
>
> scala> data.map(e=> s"$e concat").explain
>
> == Physical Plan ==
>
> *(1) SerializeFromObject [staticinvoke(class
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
> java.lang.String, true], true, false) AS value#92]
>
> +- *(1) MapElements , obj#91: java.lang.String
>
>+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String
>
>   +- LocalTableScan [value#12]
>
>
>
> Is this over-optimization or is this the right way to go?
>
>
>
> As a follow up , is there any better API to get the one and only column
> available in a DataSet[String] when using built-in functions?
> “col(data.columns.head)” works but it is not ideal.
>
>
>
> Thanks!
>