Re: Pyspark: Issue using sql in foreachBatch sink

2020-08-03 Thread muru
Thanks Jungtaek for your help.

On Fri, Jul 31, 2020 at 6:31 PM Jungtaek Lim 
wrote:

> Python doesn't allow abbreviating () with no param, whereas Scala does.
> Use `write()`, not `write`.
>
> On Wed, Jul 29, 2020 at 9:09 AM muru  wrote:
>
>> In a pyspark SS job, trying to use sql instead of sql functions in
>> foreachBatch sink
>> throws AttributeError: 'JavaMember' object has no attribute 'format'
>> exception.
>> However, the same thing works in Scala API.
>>
>> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same
>> exception.
>> Is it a bug or known issue with Pyspark implementation? I noticed that I
>> could perform other operations except the write method.
>>
>> Please, let me know how to fix this issue.
>>
>> See below code examples
>> # Spark Scala method
>> def processData(batchDF: DataFrame, batchId: Long) {
>>batchDF.createOrReplaceTempView("tbl")
>>val outdf=batchDF.sparkSession.sql("select action, count(*) as count
>> from tbl where date='2020-06-20' group by 1")
>>outdf.printSchema()
>>outdf.show
>>outdf.coalesce(1).write.format("csv").save("/tmp/agg")
>> }
>>
>> ## pyspark python method
>> def process_data(bdf, bid):
>>   lspark = bdf._jdf.sparkSession()
>>   bdf.createOrReplaceTempView("tbl")
>>   outdf=lspark.sql("select action, count(*) as count from tbl where
>> date='2020-06-20' group by 1")
>>   outdf.printSchema()
>>   # it works
>>   outdf.show()
>>   # throws AttributeError: 'JavaMember' object has no attribute 'format'
>> exception
>>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1")
>>
>> Here is the full exception
>> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id =
>> 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId =
>> e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
>> py4j.Py4JException: An exception was raised by the Python Proxy. Return
>> Message: Traceback (most recent call last):
>>   File
>> "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>> line 2381, in _call_proxy
>> return_value = getattr(self.pool[obj_id], method)(*params)
>>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
>> raise e
>> AttributeError: 'JavaMember' object has no attribute 'format'
>> at py4j.Protocol.getReturnValue(Protocol.java:473)
>> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
>> at com.sun.proxy.$Proxy20.call(Unknown Source)
>> at
>> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
>> at
>> org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
>> at
>> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>> at
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>> at
>> 

回复:What is an "analytics engine"?

2020-08-03 Thread tianlangstudio
Hello, Sir
Engine means Spark has the feature to process data. But it is musted to be 
component with other for building data platform. 

 Data Platform likes a car and Spark likes the motor.
I am wrong, Maybe.

 
TianlangStudio
Some of the biggest lies: I will start tomorrow/Others are better than me/I am 
not good enough/I don't have time/This is the way I am
 


--
发件人:Boris Gershfield 
发送时间:2020年8月3日(星期一) 17:04
收件人:apache 
抄 送:user ; webmaster 
主 题:What is an "analytics engine"?

Hi,

I'm new to Apache Spark and am trying to write an essay about Big Data 
platforms.

On the Apache Spark homepage we are told that "Apache Spark™ is a unified 
analytics engine for large-scale data processing".

I don't fully understand the meaning of "engine" and nor can I find a standard 
definition of this term. 

Please could you explain this term as you understand it.

Many thanks for your help.

Boris 

github-logo.png
Description: Binary data
<>


51cto-logo.png
Description: Binary data


duxiaomai-logo (1).png
Description: Binary data


iqiyi-logo.png
Description: Binary data


huya-logo.png
Description: Binary data


logo-baidu-220X220.png
Description: Binary data


RE: DataSource API v2 & Spark-SQL

2020-08-03 Thread Lavelle, Shawn
Thanks for clarifying, Russel.  Is spark native catalog reference on the 
roadmap for dsv2 or should I be trying to use something else?

~ Shawn

From: Russell Spitzer [mailto:russell.spit...@gmail.com]
Sent: Monday, August 3, 2020 8:27 AM
To: Lavelle, Shawn 
Cc: user 
Subject: Re: DataSource API v2 & Spark-SQL

 EXTERNAL email. Do not open links or attachments unless you recognize the 
sender. If suspicious report 
here. 

That's a bad error message. Basically you can't make a spark native catalog 
reference for a dsv2 source. You have to use that Datasources catalog or use 
the programmatic API. Both dsv1 and dsv2 programattic apis work (plus or minus 
some options)

On Mon, Aug 3, 2020, 7:28 AM Lavelle, Shawn 
mailto:shawn.lave...@osii.com>> wrote:
Hello Spark community,
   I have a custom datasource in v1 API that I’m trying to port to v2 API, in 
Java.  Currently I have a DataSource registered via catalog.createTable(name, 
, schema, options map).  When trying to do this in data source API v2, 
I get an error saying my class (package) isn’t a valid data source Can you help 
me out?

Spark versions are 3.0.0 w/scala 2.12, artifacts are Spark-core, spark-sql, 
spark-hive, spark-hive-thriftserver, spark-catalyst

Here’s what the dataSource definition:  public class LogTableSource implements  
TableProvider,  SupportsRead,  DataSourceRegister, Serializable

I’m guessing that I am missing one of the required interfaces. Note, I did try 
this with using the LogTableSource below as “DefaultSource” but the behavior is 
the same.  Also, I keep reading about a DataSourceV2 Marker Interface, but it 
seems deprecated?

Also, I tried to add DataSourceV2ScanRelation but that won’t compile:
Output() in DataSourceV2ScanRelation cannot override Output() in QueryPlan 
return type Seq is not compatible with Seq

  I’m fairly stumped – everything I’ve read online says there’s a marker 
interface of some kind and yet I can’t find it in my package list.

  Looking forward to hearing from you,

~ Shawn





[Image removed by sender. OSI]
Shawn Lavelle

Software Development

4101 Arrowhead Drive
Medina, Minnesota 55340-9457
Phone: 763 551 0559
Email: shawn.lave...@osii.com
Website: www.osii.com


Re: CVE-2020-9480: Apache Spark RCE vulnerability in auth-enabled standalone master

2020-08-03 Thread Sean Owen
I'm resending this CVE from several months ago to user@ and dev@, as
we understand that a tool to exploit it may be released soon.

The most straightforward mitigation for those that are affected (using
the standalone master, where spark.authenticate is necessary) is to
update to 2.4.6 or 3.0.0+.
For those using vendor distros, you may want to check with your vendor
about whether the relevant patch has been applied.

Sean


On Mon, Jun 22, 2020 at 4:49 PM Sean Owen  wrote:
>
> Severity: Important
>
> Vendor: The Apache Software Foundation
>
> Versions Affected:
> Apache Spark 2.4.5 and earlier
>
> Description:
> In Apache Spark 2.4.5 and earlier, a standalone resource manager's master may
> be configured to require authentication (spark.authenticate) via a
> shared secret. When enabled, however, a specially-crafted RPC to the
> master can succeed in starting an application's resources on the Spark
> cluster, even without the shared key. This can be leveraged to execute
> shell commands on the host machine.
>
> This does not affect Spark clusters using other resource managers
> (YARN, Mesos, etc).
>
>
> Mitigation:
> Users should update to Spark 2.4.6 or 3.0.0.
> Where possible, network access to the cluster machines should be
> restricted to trusted hosts only.
>
> Credit:
> Ayoub Elaassal
>
> References:
> https://spark.apache.org/security.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Henrique Oliveira
Thank you for both tips, I will definitely try the pandas_udfs. About
changing the select operation, it's not possible to have multiple explode
functions on the same select, sadly they must be applied one at a time.

Em seg., 3 de ago. de 2020 às 11:41, Patrick McCarthy <
pmccar...@dstillery.com> escreveu:

> If you use pandas_udfs in 2.4 they should be quite performant (or at least
> won't suffer serialization overhead), might be worth looking into.
>
> I didn't run your code but one consideration is that the while loop might
> be making the DAG a lot bigger than it has to be. You might see if defining
> those columns with list comprehensions forming a single select() statement
> makes for a smaller DAG.
>
> On Mon, Aug 3, 2020 at 10:06 AM Henrique Oliveira 
> wrote:
>
>> Hi Patrick, thank you for your quick response.
>> That's exactly what I think. Actually, the result of this processing is
>> an intermediate table that is going to be used for other views generation.
>> Another approach I'm trying now, is to move the "explosion" step for this
>> "view generation" step, this way I don't need to explode every column but
>> just those used for the final client.
>>
>> ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
>> python udfs I tried had very bad performance, but I will give it a try in
>> this case. It can't be worse.
>> Thanks again!
>>
>> Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
>> pmccar...@dstillery.com> escreveu:
>>
>>> This seems like a very expensive operation. Why do you want to write out
>>> all the exploded values? If you just want all combinations of values, could
>>> you instead do it at read-time with a UDF or something?
>>>
>>> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>>>
 I forgot to add an information. By "can't write" I mean it keeps
 processing
 and nothing happens. The job runs for hours even with a very small file
 and
 I have to force the stoppage.



 --
 Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>> --
>>>
>>>
>>> *Patrick McCarthy  *
>>>
>>> Senior Data Scientist, Machine Learning Engineering
>>>
>>> Dstillery
>>>
>>> 470 Park Ave South, 17th Floor, NYC 10016
>>>
>>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
If you use pandas_udfs in 2.4 they should be quite performant (or at least
won't suffer serialization overhead), might be worth looking into.

I didn't run your code but one consideration is that the while loop might
be making the DAG a lot bigger than it has to be. You might see if defining
those columns with list comprehensions forming a single select() statement
makes for a smaller DAG.

On Mon, Aug 3, 2020 at 10:06 AM Henrique Oliveira  wrote:

> Hi Patrick, thank you for your quick response.
> That's exactly what I think. Actually, the result of this processing is an
> intermediate table that is going to be used for other views generation.
> Another approach I'm trying now, is to move the "explosion" step for this
> "view generation" step, this way I don't need to explode every column but
> just those used for the final client.
>
> ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
> python udfs I tried had very bad performance, but I will give it a try in
> this case. It can't be worse.
> Thanks again!
>
> Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
> pmccar...@dstillery.com> escreveu:
>
>> This seems like a very expensive operation. Why do you want to write out
>> all the exploded values? If you just want all combinations of values, could
>> you instead do it at read-time with a UDF or something?
>>
>> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>>
>>> I forgot to add an information. By "can't write" I mean it keeps
>>> processing
>>> and nothing happens. The job runs for hours even with a very small file
>>> and
>>> I have to force the stoppage.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>>
>>
>> *Patrick McCarthy  *
>>
>> Senior Data Scientist, Machine Learning Engineering
>>
>> Dstillery
>>
>> 470 Park Ave South, 17th Floor, NYC 10016
>>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Henrique Oliveira
Hi Patrick, thank you for your quick response.
That's exactly what I think. Actually, the result of this processing is an
intermediate table that is going to be used for other views generation.
Another approach I'm trying now, is to move the "explosion" step for this
"view generation" step, this way I don't need to explode every column but
just those used for the final client.

ps.I was avoiding UDFs for now because I'm still on Spark 2.4 and the
python udfs I tried had very bad performance, but I will give it a try in
this case. It can't be worse.
Thanks again!

Em seg., 3 de ago. de 2020 às 10:53, Patrick McCarthy <
pmccar...@dstillery.com> escreveu:

> This seems like a very expensive operation. Why do you want to write out
> all the exploded values? If you just want all combinations of values, could
> you instead do it at read-time with a UDF or something?
>
> On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:
>
>> I forgot to add an information. By "can't write" I mean it keeps
>> processing
>> and nothing happens. The job runs for hours even with a very small file
>> and
>> I have to force the stoppage.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Re: [Spark SQL]: Can't write DataFrame after using explode function on multiple columns.

2020-08-03 Thread Patrick McCarthy
This seems like a very expensive operation. Why do you want to write out
all the exploded values? If you just want all combinations of values, could
you instead do it at read-time with a UDF or something?

On Sat, Aug 1, 2020 at 8:34 PM hesouol  wrote:

> I forgot to add an information. By "can't write" I mean it keeps processing
> and nothing happens. The job runs for hours even with a very small file and
> I have to force the stoppage.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


*Patrick McCarthy  *

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016


Re: DataSource API v2 & Spark-SQL

2020-08-03 Thread Russell Spitzer
That's a bad error message. Basically you can't make a spark native catalog
reference for a dsv2 source. You have to use that Datasources catalog or
use the programmatic API. Both dsv1 and dsv2 programattic apis work (plus
or minus some options)

On Mon, Aug 3, 2020, 7:28 AM Lavelle, Shawn  wrote:

> Hello Spark community,
>
>I have a custom datasource in v1 API that I’m trying to port to v2 API,
> in Java.  Currently I have a DataSource registered via
> catalog.createTable(name, , schema, options map).  When trying to
> do this in data source API v2, I get an error saying my class (package)
> isn’t a valid data source Can you help me out?
>
>
>
> Spark versions are 3.0.0 w/scala 2.12, artifacts are Spark-core,
> spark-sql, spark-hive, spark-hive-thriftserver, spark-catalyst
>
>
>
> Here’s what the dataSource definition:  *public class LogTableSource
> implements  TableProvider,  SupportsRead,  DataSourceRegister, Serializable*
>
>
>
> I’m guessing that I am missing one of the required interfaces. Note, I did
> try this with using the LogTableSource below as “DefaultSource” but the
> behavior is the same.  Also, I keep reading about a DataSourceV2 Marker
> Interface, but it seems deprecated?
>
>
>
> Also, I tried to add *DataSourceV2ScanRelation* but that won’t compile:
>
> Output() in DataSourceV2ScanRelation cannot override Output() in QueryPlan
> return type Seq is not compatible with Seq
>
>
>
>   I’m fairly stumped – everything I’ve read online says there’s a marker
> interface of some kind and yet I can’t find it in my package list.
>
>
>
>   Looking forward to hearing from you,
>
>
>
> ~ Shawn
>
>
>
>
>
>
>
>
> [image: OSI]
> Shawn Lavelle
>
> Software Development
>
> 4101 Arrowhead Drive
> Medina, Minnesota 55340-9457
> Phone: 763 551 0559
> *Email:* shawn.lave...@osii.com
> *Website:* www.osii.com
>


DataSource API v2 & Spark-SQL

2020-08-03 Thread Lavelle, Shawn
Hello Spark community,
   I have a custom datasource in v1 API that I'm trying to port to v2 API, in 
Java.  Currently I have a DataSource registered via catalog.createTable(name, 
, schema, options map).  When trying to do this in data source API v2, 
I get an error saying my class (package) isn't a valid data source Can you help 
me out?

Spark versions are 3.0.0 w/scala 2.12, artifacts are Spark-core, spark-sql, 
spark-hive, spark-hive-thriftserver, spark-catalyst

Here's what the dataSource definition:  public class LogTableSource implements  
TableProvider,  SupportsRead,  DataSourceRegister, Serializable

I'm guessing that I am missing one of the required interfaces. Note, I did try 
this with using the LogTableSource below as "DefaultSource" but the behavior is 
the same.  Also, I keep reading about a DataSourceV2 Marker Interface, but it 
seems deprecated?

Also, I tried to add DataSourceV2ScanRelation but that won't compile:
Output() in DataSourceV2ScanRelation cannot override Output() in QueryPlan 
return type Seq is not compatible with Seq

  I'm fairly stumped - everything I've read online says there's a marker 
interface of some kind and yet I can't find it in my package list.

  Looking forward to hearing from you,

~ Shawn







[OSI]
Shawn Lavelle

Software Development

4101 Arrowhead Drive
Medina, Minnesota 55340-9457
Phone: 763 551 0559
Email: shawn.lave...@osii.com
Website: www.osii.com


What is an "analytics engine"?

2020-08-03 Thread Boris Gershfield
Hi,

I'm new to Apache Spark and am trying to write an essay about Big Data
platforms.

On the Apache Spark homepage we are told that "Apache Spark™ is a unified
analytics engine for large-scale data processing".

I don't fully understand the meaning of "engine" and nor can I find a
standard definition of this term.

Please could you explain this term as you understand it.

Many thanks for your help.

Boris