Implementing Spark metric source and Sink for custom application metrics

2018-04-18 Thread AnilKumar B
Hi All,

What is the best way to instrument metrics of Spark Application from both
Driver and Executor.

I am trying to send my Spark application metrics into Kafka. I found two
approaches.

*Approach 1: * Implement custom Source and Sink and use the Source for
instrumenting from both Driver and Executor(By using SparkEnv.metricSystem).

*Approach 2:* Write dropwizard/gobblin KafkaReporter and use it for
instrumentation from both Driver/Executor

Which one will be better approach?

I tried to go with Approach 1, but when I launch my application all the
containers are getting killed.

The steps I did is as below:
1. As there is no KafkaSink from org.apache.spark.metrics.sink, I have
implemented my custom KafkaSink and KafkaReporter as suggested in
https://github.com/erikerlandson/spark-kafka-sink
2. Implemented SparkMetricsSource by extending
org.apache.spark.metrics.source.Source
3. registered the source
  val sparkMetricsSource = new
SparkMetricsSource("spark.xyz.app.prefix")
   SparkEnv.get.metricsSystem.registerSource(sparkMetricsSource)
4. Instrumented the metrics

 sparkMetricsSource.registerGauge(sparkEnv.spark.sparkContext.applicationId,
schema, "app-start", System.currentTimeMillis)
5. Configured the Sink through spark properties


Thanks & Regards,
B Anil Kumar.


distributed choleksy on spark?

2018-04-18 Thread qifan
I'm wondering if anyone has done distributed choleksy decomposition on Spark.
I need to do it on a large matrix (200k x 200k) which would not fit on 1
machine.

So far I've found:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala
and SystemML, but both seem to be single-node implementations.

Any information on the issue is helpful! Thanks.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
I assume its going to compare by the first column and if equal compare the 
second column and so on.

From:  kant kodali 
Date:  Wednesday, April 18, 2018 at 6:26 PM
To:  Jungtaek Lim 
Cc:  Arun Iyer , Michael Armbrust , 
Tathagata Das , "user @spark" 

Subject:  Re: can we use mapGroupsWithState in raw sql?

This is cool! Looks to me this works too

select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group 
by id)

but I got naive question again. what does max of a struct mean? Does it always 
take the max of the first column and ignore the rest of the fields in the 
struct?

On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim  wrote:
Thanks Arun, I modified a bit to try my best to avoid enumerating fields: 

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.*")
  .groupBy($"ID")
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .select($"data.*")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

It still have a minor issue: the column "AMOUNT" is showing twice in result 
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan 님이 작성:
The below expr might work:

df.groupBy($"id").agg(max(struct($"amount", 
$"my_timestamp")).as("data")).select($"id", $"data.*")

Thanks,
Arun

From: Jungtaek Lim 
Date: Wednesday, April 18, 2018 at 4:54 PM
To: Michael Armbrust 
Cc: kant kodali , Arun Iyer , Tathagata 
Das , "user @spark" 

Subject: Re: can we use mapGroupsWithState in raw sql?

Thanks Michael for providing great solution. Great to remove UDAF and any needs 
to provide fields manually. 

Btw, your code has compilation error. ')' is missing, and after I fix it, it 
complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName, 
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell or 
whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성:
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp", 
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:
Hi Arun, 

I want to select the entire row with the max timestamp for each group. I have 
modified my data set below to avoid any confusion.

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
1  |  6 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From: kant kodali 
Date: Tuesday, April 17, 2018 at 11:41 AM
To: Tathagata Das 
Cc: "user @spark" 
Subject: Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread kant kodali
This is cool! Looks to me this works too

select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1
group by id)

but I got naive question again. what does max of a struct mean? Does it
always take the max of the first column and ignore the rest of the fields
in the struct?

On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim  wrote:

> Thanks Arun, I modified a bit to try my best to avoid enumerating fields:
>
> val query = socketDF
>   .selectExpr("CAST(value AS STRING) as value")
>   .as[String]
>   .select(from_json($"value", schema=schema).as("data"))
>   .select($"data.*")
>   .groupBy($"ID")
>   .agg(max(struct($"AMOUNT", $"*")).as("data"))
>   .select($"data.*")
>   .writeStream
>   .format("console")
>   .trigger(Trigger.ProcessingTime("1 seconds"))
>   .outputMode(OutputMode.Update())
>   .start()
>
> It still have a minor issue: the column "AMOUNT" is showing twice in
> result table, but everything works like a charm.
>
> -Jungtaek Lim (HeartSaVioR)
>
> 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan 님이 작성:
>
>> The below expr might work:
>>
>> df.groupBy($"id").agg(max(struct($"amount", 
>> $"my_timestamp")).as("data")).select($"id", $"data.*")
>>
>>
>> Thanks,
>> Arun
>>
>> From: Jungtaek Lim 
>> Date: Wednesday, April 18, 2018 at 4:54 PM
>> To: Michael Armbrust 
>> Cc: kant kodali , Arun Iyer ,
>> Tathagata Das , "user @spark" <
>> user@spark.apache.org>
>>
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Thanks Michael for providing great solution. Great to remove UDAF and any
>> needs to provide fields manually.
>>
>> Btw, your code has compilation error. ')' is missing, and after I fix it,
>> it complains again with other issue.
>>
>> :66: error: overloaded method value max with alternatives:
>>   (columnName: String)org.apache.spark.sql.Column 
>>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>>  cannot be applied to (org.apache.spark.sql.ColumnName,
>> org.apache.spark.sql.Column)
>>
>> Could you check your code to see it works with Spark 2.3 (via spark-shell
>> or whatever)?
>>
>> Thanks!
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성:
>>
>>> You can calculate argmax using a struct.
>>>
>>> df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).
>>> getField("data").select($"data.*")
>>>
>>> You could transcode this to SQL, it'll just be complicated nested
>>> queries.
>>>
>>>
>>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:
>>>
 Hi Arun,

 I want to select the entire row with the max timestamp for each group.
 I have modified my data set below to avoid any confusion.

 *Input:*

 id | amount | my_timestamp
 ---
 1  |  5 |  2018-04-01T01:00:00.000Z
 1  | 10 |  2018-04-01T01:10:00.000Z
 1  |  6 |  2018-04-01T01:20:00.000Z
 2  | 30 |  2018-04-01T01:25:00.000Z
 2  | 40 |  2018-04-01T01:30:00.000Z

 *Expected Output:*

 id | amount | my_timestamp
 ---
 1  | 10 |  2018-04-01T01:10:00.000Z
 2  | 40 |  2018-04-01T01:30:00.000Z

 Looking for a streaming solution using either raw sql like 
 sparkSession.sql("sql
 query") or similar to raw sql but not something like mapGroupWithState

 On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan 
 wrote:

> Cant the “max” function used here ? Something like..
>
> stream.groupBy($"id").max("amount").writeStream.
> outputMode(“complete”/“update")….
>
> Unless the “stream” is already a grouped stream, in which case the
> above would not work since the support for multiple aggregate operations 
> is
> not there yet.
>
> Thanks,
> Arun
>
> From: kant kodali 
> Date: Tuesday, April 17, 2018 at 11:41 AM
> To: Tathagata Das 
> Cc: "user @spark" 
> Subject: Re: can we use mapGroupsWithState in raw sql?
>
> Hi TD,
>
> Thanks for that. The only reason I ask is I don't see any alternative
> solution to solve the problem below using raw sql.
>
>
> How to select the max row for every group in spark structured
> streaming 2.3.0 without using order by since it requires complete
> mode or mapGroupWithState?
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 20 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
Thanks Arun, I modified a bit to try my best to avoid enumerating fields:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.*")
  .groupBy($"ID")
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .select($"data.*")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

It still have a minor issue: the column "AMOUNT" is showing twice in result
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan 님이 작성:

> The below expr might work:
>
> df.groupBy($"id").agg(max(struct($"amount", 
> $"my_timestamp")).as("data")).select($"id", $"data.*")
>
>
> Thanks,
> Arun
>
> From: Jungtaek Lim 
> Date: Wednesday, April 18, 2018 at 4:54 PM
> To: Michael Armbrust 
> Cc: kant kodali , Arun Iyer ,
> Tathagata Das , "user @spark" <
> user@spark.apache.org>
>
> Subject: Re: can we use mapGroupsWithState in raw sql?
>
> Thanks Michael for providing great solution. Great to remove UDAF and any
> needs to provide fields manually.
>
> Btw, your code has compilation error. ')' is missing, and after I fix it,
> it complains again with other issue.
>
> :66: error: overloaded method value max with alternatives:
>   (columnName: String)org.apache.spark.sql.Column 
>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>  cannot be applied to (org.apache.spark.sql.ColumnName,
> org.apache.spark.sql.Column)
>
> Could you check your code to see it works with Spark 2.3 (via spark-shell
> or whatever)?
>
> Thanks!
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성:
>
>> You can calculate argmax using a struct.
>>
>> df.groupBy($"id").agg(max($"my_timestamp",
>> struct($"*").as("data")).getField("data").select($"data.*")
>>
>> You could transcode this to SQL, it'll just be complicated nested queries.
>>
>>
>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:
>>
>>> Hi Arun,
>>>
>>> I want to select the entire row with the max timestamp for each group. I
>>> have modified my data set below to avoid any confusion.
>>>
>>> *Input:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  |  5 |  2018-04-01T01:00:00.000Z
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 1  |  6 |  2018-04-01T01:20:00.000Z
>>> 2  | 30 |  2018-04-01T01:25:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> *Expected Output:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>>
>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan 
>>> wrote:
>>>
 Cant the “max” function used here ? Something like..


 stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….

 Unless the “stream” is already a grouped stream, in which case the
 above would not work since the support for multiple aggregate operations is
 not there yet.

 Thanks,
 Arun

 From: kant kodali 
 Date: Tuesday, April 17, 2018 at 11:41 AM
 To: Tathagata Das 
 Cc: "user @spark" 
 Subject: Re: can we use mapGroupsWithState in raw sql?

 Hi TD,

 Thanks for that. The only reason I ask is I don't see any alternative
 solution to solve the problem below using raw sql.


 How to select the max row for every group in spark structured streaming
 2.3.0 without using order by since it requires complete mode or
 mapGroupWithState?

 *Input:*

 id | amount | my_timestamp
 ---
 1  |  5 |  2018-04-01T01:00:00.000Z
 1  | 10 |  2018-04-01T01:10:00.000Z
 2  | 20 |  2018-04-01T01:20:00.000Z
 2  | 30 |  2018-04-01T01:25:00.000Z
 2  | 40 |  2018-04-01T01:30:00.000Z

 *Expected Output:*

 id | amount | my_timestamp
 ---
 1  | 10 |  2018-04-01T01:10:00.000Z
 2  | 40 |  2018-04-01T01:30:00.000Z

 Looking for a streaming solution using either raw sql like 
 sparkSession.sql("sql
 query") or similar to raw sql but not something like mapGroupWithState

 On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> 

Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
The below expr might work:

df.groupBy($"id").agg(max(struct($"amount", 
$"my_timestamp")).as("data")).select($"id", $"data.*")

Thanks,
Arun

From:  Jungtaek Lim 
Date:  Wednesday, April 18, 2018 at 4:54 PM
To:  Michael Armbrust 
Cc:  kant kodali , Arun Iyer , Tathagata 
Das , "user @spark" 
Subject:  Re: can we use mapGroupsWithState in raw sql?

Thanks Michael for providing great solution. Great to remove UDAF and any needs 
to provide fields manually. 

Btw, your code has compilation error. ')' is missing, and after I fix it, it 
complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName, 
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell or 
whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성:
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp", 
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:
Hi Arun, 

I want to select the entire row with the max timestamp for each group. I have 
modified my data set below to avoid any confusion.

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
1  |  6 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From: kant kodali 
Date: Tuesday, April 17, 2018 at 11:41 AM
To: Tathagata Das 
Cc: "user @spark" 
Subject: Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das  
wrote:
Unfortunately no. Honestly it does not make sense as for type-aware operations 
like map, mapGroups, etc., you have to provide an actual JVM function. That 
does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
Hi All, 

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!









Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Jungtaek Lim
Thanks Michael for providing great solution. Great to remove UDAF and any
needs to provide fields manually.

Btw, your code has compilation error. ')' is missing, and after I fix it,
it complains again with other issue.

:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column 
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName,
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell
or whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성:

> You can calculate argmax using a struct.
>
> df.groupBy($"id").agg(max($"my_timestamp",
> struct($"*").as("data")).getField("data").select($"data.*")
>
> You could transcode this to SQL, it'll just be complicated nested queries.
>
>
> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:
>
>> Hi Arun,
>>
>> I want to select the entire row with the max timestamp for each group. I
>> have modified my data set below to avoid any confusion.
>>
>> *Input:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  |  5 |  2018-04-01T01:00:00.000Z
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 1  |  6 |  2018-04-01T01:20:00.000Z
>> 2  | 30 |  2018-04-01T01:25:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
>>
>>> Cant the “max” function used here ? Something like..
>>>
>>>
>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
>>>
>>> Unless the “stream” is already a grouped stream, in which case the above
>>> would not work since the support for multiple aggregate operations is not
>>> there yet.
>>>
>>> Thanks,
>>> Arun
>>>
>>> From: kant kodali 
>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>> To: Tathagata Das 
>>> Cc: "user @spark" 
>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>
>>> Hi TD,
>>>
>>> Thanks for that. The only reason I ask is I don't see any alternative
>>> solution to solve the problem below using raw sql.
>>>
>>>
>>> How to select the max row for every group in spark structured streaming
>>> 2.3.0 without using order by since it requires complete mode or
>>> mapGroupWithState?
>>>
>>> *Input:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  |  5 |  2018-04-01T01:00:00.000Z
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 20 |  2018-04-01T01:20:00.000Z
>>> 2  | 30 |  2018-04-01T01:25:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> *Expected Output:*
>>>
>>> id | amount | my_timestamp
>>> ---
>>> 1  | 10 |  2018-04-01T01:10:00.000Z
>>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>>
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>>
>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Unfortunately no. Honestly it does not make sense as for type-aware
 operations like map, mapGroups, etc., you have to provide an actual JVM
 function. That does not fit in with the SQL language structure.

 On Mon, Apr 16, 2018 at 7:34 PM, kant kodali 
 wrote:

> Hi All,
>
> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>
> Thanks!
>
>
>

>>>
>>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Michael Armbrust
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp",
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali  wrote:

> Hi Arun,
>
> I want to select the entire row with the max timestamp for each group. I
> have modified my data set below to avoid any confusion.
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 1  |  6 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:
>
>> Cant the “max” function used here ? Something like..
>>
>> stream.groupBy($"id").max("amount").writeStream.outputMode(“
>> complete”/“update")….
>>
>> Unless the “stream” is already a grouped stream, in which case the above
>> would not work since the support for multiple aggregate operations is not
>> there yet.
>>
>> Thanks,
>> Arun
>>
>> From: kant kodali 
>> Date: Tuesday, April 17, 2018 at 11:41 AM
>> To: Tathagata Das 
>> Cc: "user @spark" 
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Hi TD,
>>
>> Thanks for that. The only reason I ask is I don't see any alternative
>> solution to solve the problem below using raw sql.
>>
>>
>> How to select the max row for every group in spark structured streaming
>> 2.3.0 without using order by since it requires complete mode or
>> mapGroupWithState?
>>
>> *Input:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  |  5 |  2018-04-01T01:00:00.000Z
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 20 |  2018-04-01T01:20:00.000Z
>> 2  | 30 |  2018-04-01T01:25:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount | my_timestamp
>> ---
>> 1  | 10 |  2018-04-01T01:10:00.000Z
>> 2  | 40 |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>> function. That does not fit in with the SQL language structure.
>>>
>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>>>
 Hi All,

 can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

 Thanks!



>>>
>>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread kant kodali
Hi Arun,

I want to select the entire row with the max timestamp for each group. I
have modified my data set below to avoid any confusion.

*Input:*

id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
1  |  6 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z

*Expected Output:*

id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z

Looking for a streaming solution using either raw sql like
sparkSession.sql("sql
query") or similar to raw sql but not something like mapGroupWithState

On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan  wrote:

> Cant the “max” function used here ? Something like..
>
> stream.groupBy($"id").max("amount").writeStream.
> outputMode(“complete”/“update")….
>
> Unless the “stream” is already a grouped stream, in which case the above
> would not work since the support for multiple aggregate operations is not
> there yet.
>
> Thanks,
> Arun
>
> From: kant kodali 
> Date: Tuesday, April 17, 2018 at 11:41 AM
> To: Tathagata Das 
> Cc: "user @spark" 
> Subject: Re: can we use mapGroupsWithState in raw sql?
>
> Hi TD,
>
> Thanks for that. The only reason I ask is I don't see any alternative
> solution to solve the problem below using raw sql.
>
>
> How to select the max row for every group in spark structured streaming
> 2.3.0 without using order by since it requires complete mode or
> mapGroupWithState?
>
> *Input:*
>
> id | amount | my_timestamp
> ---
> 1  |  5 |  2018-04-01T01:00:00.000Z
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 20 |  2018-04-01T01:20:00.000Z
> 2  | 30 |  2018-04-01T01:25:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount | my_timestamp
> ---
> 1  | 10 |  2018-04-01T01:10:00.000Z
> 2  | 40 |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Unfortunately no. Honestly it does not make sense as for type-aware
>> operations like map, mapGroups, etc., you have to provide an actual JVM
>> function. That does not fit in with the SQL language structure.
>>
>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: can we use mapGroupsWithState in raw sql?

2018-04-18 Thread Arun Mahadevan
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From:  kant kodali 
Date:  Tuesday, April 17, 2018 at 11:41 AM
To:  Tathagata Das 
Cc:  "user @spark" 
Subject:  Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount | my_timestamp
---
1  |  5 |  2018-04-01T01:00:00.000Z
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 20 |  2018-04-01T01:20:00.000Z
2  | 30 |  2018-04-01T01:25:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount | my_timestamp
---
1  | 10 |  2018-04-01T01:10:00.000Z
2  | 40 |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das  
wrote:
Unfortunately no. Honestly it does not make sense as for type-aware operations 
like map, mapGroups, etc., you have to provide an actual JVM function. That 
does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali  wrote:
Hi All, 

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!







[Spark 2.3] GLM Poisson issue

2018-04-18 Thread svattig
Has any one ran Poisson GLM model and got the
GeneralizedLinearRegressionTrainingSummary object (to access p, t-values,
deviances ,aic etc.,) successfully?

I have tried to fit two datasets to compare Spark vs R outputs, both models
ran fine in Spark and i was able to get the coefficients back. But when i
tried to get the object  "GeneralizedLinearRegressionTrainingSummary" i
always get the following error

java.lang.NumberFormatException
  at java.math.BigDecimal.(BigDecimal.java:494)
  at java.math.BigDecimal.(BigDecimal.java:824)
  at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
  at scala.math.BigDecimal$.apply(BigDecimal.scala:249)
  at
org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary.org$apache$spark$ml$regression$GeneralizedLinearRegressionTrainingSummary$$round$1(GeneralizedLinearRegression.scala:1503)
  at
org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary.toString(GeneralizedLinearRegression.scala:1551)
  at
scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)

Looking into Spark 2.3 source code for
GeneralizedLinearRegressionTrainingSummary's toString method throws error at
rounding the deviances. For some reason the deviances are NaN. 
I have ran the same model in spark 2.2.0, i can get the
GeneralizedLinearRegressionTrainingSummary object fine(slightly different
code than Spark 2.3) but the deviances are also NaN. 

Does any one know why the deviances are NaN?

Thanks,
Srikar.V  

 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why doesn't spark use broadcast join?

2018-04-18 Thread Kurt Fehlhauer
Try running AnalyzeTableCommand on both tables first.

On Wed, Apr 18, 2018 at 2:57 AM Matteo Cossu  wrote:

> Can you check the value for spark.sql.autoBroadcastJoinThreshold?
>
> On 29 March 2018 at 14:41, Vitaliy Pisarev 
> wrote:
>
>> I am looking at the physical plan for the following query:
>>
>> SELECT f1,f2,f3,...
>> FROM T1
>> LEFT ANTI JOIN T2 ON T1.id = T2.id
>> WHERE  f1 = 'bla'
>>AND f2 = 'bla2'
>>AND some_date >= date_sub(current_date(), 1)
>> LIMIT 100
>>
>> An important detail: the table 'T1' can be very large (hundreds of
>> thousands of rows), but table T2 is rather small. Maximun in the thousands.
>> In this particular case, the table T2 has 2 rows.
>>
>> In the physical plan, I see that a SortMergeJoin is performed. Despite it
>> being the perfect candidate for a broadcast join.
>>
>> What could be the reason for this?
>> Is there a way to hint the optimizer to perform a broadcast join in the
>> sql syntax?
>>
>> I am writing this in pyspark and the query itself is over parquets stored
>> in Azure blob storage.
>>
>>
>>
>


Re: Why doesn't spark use broadcast join?

2018-04-18 Thread Matteo Cossu
Can you check the value for spark.sql.autoBroadcastJoinThreshold?

On 29 March 2018 at 14:41, Vitaliy Pisarev 
wrote:

> I am looking at the physical plan for the following query:
>
> SELECT f1,f2,f3,...
> FROM T1
> LEFT ANTI JOIN T2 ON T1.id = T2.id
> WHERE  f1 = 'bla'
>AND f2 = 'bla2'
>AND some_date >= date_sub(current_date(), 1)
> LIMIT 100
>
> An important detail: the table 'T1' can be very large (hundreds of
> thousands of rows), but table T2 is rather small. Maximun in the thousands.
> In this particular case, the table T2 has 2 rows.
>
> In the physical plan, I see that a SortMergeJoin is performed. Despite it
> being the perfect candidate for a broadcast join.
>
> What could be the reason for this?
> Is there a way to hint the optimizer to perform a broadcast join in the
> sql syntax?
>
> I am writing this in pyspark and the query itself is over parquets stored
> in Azure blob storage.
>
>
>


Unsubscribe

2018-04-18 Thread Anu B Nair



"not in" sql spend a lot of time

2018-04-18 Thread 崔苗
Hi,
when I  execute sql like that:
"select * from onlineDevice where deviceId not in (select deviceId from 
historyDevice)")
I found the task spend a lot of time(over 40 min),I stopped the task but I 
can't found the reason from spark history UI.
the historyDevice and onlineDevice only contain about 3 millions of records

spark-submit :
  --master yarn --deploy-mode client --driver-memory 8G --num-executors 2 
--executor-memory 9G --executor-cores 6