Insertable records in Datasource v2.

2021-01-13 Thread Rahul Kumar
I'm implementing V2 datasource for a custom datasource.

I'm trying to insert a record into a temp view, in following fashion.

insertDFWithSchema.createOrReplaceTempView(sqlView)
spark.sql(s”insert into $sqlView  values (2, ‘insert_record1’, 200,
23000), (20001, ‘insert_record2’, 201, 23001)“). where insertDFWithSchema is
some dataframe loaded from custom data source.


I end up getting following exception

*org.apache.spark.sql.AnalysisException: unresolved operator
'InsertIntoTable RelationV2* mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false;;
'InsertIntoTable RelationV2 mydb[id#63, name#64, age#65, salary#66]
(Options:
[mydb.updateByKey=id,mydb.namespace=test,paths=[],mydb.set=input_data,mydb.se...),
false, false
+- LocalRelation [col1#88, col2#89, col3#90, col4#91]
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
  at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
  at
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:643)
  ... 40 elided


In V1 datasource implementation, I had insertable trait in BaseRelation. In
v2, I'm not sure how it could be achieved. I have also tried implementing
insertable trait in DefaultSource.   Any input would be extremely helpful.

Thanks,
Rahul 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Data source v2 streaming sinks does not support Update mode

2021-01-13 Thread Eric Beabes
Ok. I will work on creating a reproducible app. Thanks.

On Wed, Jan 13, 2021 at 3:57 PM Gabor Somogyi 
wrote:

> Just reached this thread. +1 on to create a simple reproducer app and I
> suggest to create a jira attaching the full driver and executor logs.
> Ping me on the jira and I'll pick this up right away...
>
> Thanks!
>
> G
>
>
> On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim 
> wrote:
>
>> Would you mind if I ask for a simple reproducer? Would be nice if you
>> could create a repository in Github and push the code including the build
>> script.
>>
>> Thanks in advance!
>>
>> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
>> wrote:
>>
>>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>>
>>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>>> kabhwan.opensou...@gmail.com> wrote:
>>>
 Which exact Spark version did you use? Did you make sure the version
 for Spark and the version for spark-sql-kafka artifact are the same? (I
 asked this because you've said you've used Spark 3.0 but spark-sql-kafka
 dependency pointed to 3.1.0.)

 On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
 wrote:

> org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
> streaming sinks does not support Update mode. === Streaming Query ===
> Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
> 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} 
> Current
> Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE 
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
> Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
> sinks does not support Update mode. at
> org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
> ... 1 more
>
>
> *Please see the attached image for more information.*
>
>
> On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski 
> wrote:
>
>> Hi,
>>
>> Can you post the whole message? I'm trying to find what might be
>> causing it. A small reproducible example would be of help too. Thank you.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
>> wrote:
>>
>>> Trying to port my Spark 2.4 based (Structured) streaming application
>>> to Spark 3.0. I compiled it using the dependency given below:
>>>
>>> 
>>> org.apache.spark
>>> 
>>> spark-sql-kafka-0-10_${scala.binary.version}
>>> 3.1.0
>>> 
>>>
>>>
>>> Every time I run it under Spark 3.0, I get this message: *Data
>>> source v2 streaming sinks does not support Update mode*
>>>
>>> I am using '*mapGroupsWithState*' so as per this link (
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>>> the only supported Output mode is "*Update*".
>>>
>>> My Sink is a Kafka topic so I am using this:
>>>
>>> .writeStream
>>> .format("kafka")
>>>
>>>
>>> What am I missing?
>>>
>>>
>>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org




understanding spark shuffle file re-use better

2021-01-13 Thread Koert Kuipers
is shuffle file re-use based on identity or equality of the dataframe?

for example if run the exact same code twice to load data and do transforms
(joins, aggregations, etc.) but without re-using any actual dataframes,
will i still see skipped stages thanks to shuffle file re-use?

thanks!
koert


Spark Event Log Forwarding and Offset Tracking

2021-01-13 Thread raymond.tan
Hello here, I am new to spark and am trying to add some monitoring for spark
applications specifically to handle the below situations - 1 - Forwarding
Spark Event Logs to identify critical events like job start, executor
failures, job failures etc to ElasticSearch via log4j. However I could not
find any way to foward event log via log4j configurations. Is there any
other recommended approach to track these application events?2 - For Spark
streaming jobs, is there any way to identify that data from Kafka is not
consumed for whatever reason, or the offsets are not progressing as expected
and also forward that to ElasticSearch via log4j for
monitoringThanks,Raymond



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Connection to Presto via Spark

2021-01-13 Thread Vineet Mishra
Hi,

I am trying to connect to Presto via Spark shell using the following
connection string, however ending up with exception

*-bash-4.2$ spark-shell  --driver-class-path
com.facebook.presto.jdbc.PrestoDriver  --jars presto-jdbc-0.221.jar*

*scala> val presto_df = sqlContext.read.format("jdbc").option("url",
"jdbc:presto://presto-prd.url.com:8443/hive/xyz
").option("dbtable","testTable").option("driver","com.facebook.presto.jdbc.PrestoDriver").load()*
java.sql.SQLException: Unrecognized connection property 'url'
at
com.facebook.presto.jdbc.PrestoDriverUri.validateConnectionProperties(PrestoDriverUri.java:316)
at com.facebook.presto.jdbc.PrestoDriverUri.(PrestoDriverUri.java:95)
at com.facebook.presto.jdbc.PrestoDriverUri.(PrestoDriverUri.java:85)
at com.facebook.presto.jdbc.PrestoDriver.connect(PrestoDriver.java:87)
at
org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:61)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:120)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:57)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)

Upon removing the url option from the above string I am getting the
following exception,

*scala> val presto_df = sqlContext.read.format("jdbc").option("uri",
"jdbc:presto://presto-prd.url.com:8443/hive/xyz
").option("dbtable","testTable").option("driver","com.facebook.presto.jdbc.PrestoDriver").load()*
 java.lang.RuntimeException: Option 'url' not specified
at scala.sys.package$.error(package.scala:27)
at
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource$$anonfun$1.apply(DefaultSource.scala:33)
at
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource$$anonfun$1.apply(DefaultSource.scala:33)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at
org.apache.spark.sql.execution.datasources.CaseInsensitiveMap.getOrElse(ddl.scala:150)
at
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:33)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)

Would be helpful if someone can help here!

Thanks!
VM


Re: Data source v2 streaming sinks does not support Update mode

2021-01-13 Thread Gabor Somogyi
Just reached this thread. +1 on to create a simple reproducer app and I
suggest to create a jira attaching the full driver and executor logs.
Ping me on the jira and I'll pick this up right away...

Thanks!

G


On Wed, Jan 13, 2021 at 8:54 AM Jungtaek Lim 
wrote:

> Would you mind if I ask for a simple reproducer? Would be nice if you
> could create a repository in Github and push the code including the build
> script.
>
> Thanks in advance!
>
> On Wed, Jan 13, 2021 at 3:46 PM Eric Beabes 
> wrote:
>
>> I tried both. First tried 3.0.0. That didn't work so I tried 3.1.0.
>>
>> On Wed, Jan 13, 2021 at 11:35 AM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> Which exact Spark version did you use? Did you make sure the version for
>>> Spark and the version for spark-sql-kafka artifact are the same? (I asked
>>> this because you've said you've used Spark 3.0 but spark-sql-kafka
>>> dependency pointed to 3.1.0.)
>>>
>>> On Tue, Jan 12, 2021 at 11:04 PM Eric Beabes 
>>> wrote:
>>>
 org.apache.spark.sql.streaming.StreamingQueryException: Data source v2
 streaming sinks does not support Update mode. === Streaming Query ===
 Identifier: [id = 1f342043-29de-4381-bc48-1c6eef99232e, runId =
 62410f05-db59-4026-83fc-439a79b01c29] Current Committed Offsets: {} Current
 Available Offsets: {} Current State: INITIALIZING Thread State: RUNNABLE at
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
 at
 org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
 Caused by: java.lang.IllegalArgumentException: Data source v2 streaming
 sinks does not support Update mode. at
 org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:635)
 at
 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:130)
 at
 org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
 at 
 org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:320)
 ... 1 more


 *Please see the attached image for more information.*


 On Tue, Jan 12, 2021 at 6:01 PM Jacek Laskowski 
 wrote:

> Hi,
>
> Can you post the whole message? I'm trying to find what might be
> causing it. A small reproducible example would be of help too. Thank you.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Tue, Jan 12, 2021 at 6:35 AM Eric Beabes 
> wrote:
>
>> Trying to port my Spark 2.4 based (Structured) streaming application
>> to Spark 3.0. I compiled it using the dependency given below:
>>
>> 
>> org.apache.spark
>> spark-sql-kafka-0-10_${scala.binary.version}
>> 3.1.0
>> 
>>
>>
>> Every time I run it under Spark 3.0, I get this message: *Data
>> source v2 streaming sinks does not support Update mode*
>>
>> I am using '*mapGroupsWithState*' so as per this link (
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes),
>> the only supported Output mode is "*Update*".
>>
>> My Sink is a Kafka topic so I am using this:
>>
>> .writeStream
>> .format("kafka")
>>
>>
>> What am I missing?
>>
>>
>>
 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>