Implementing Spark metric source and Sink for custom application metrics
Hi All, What is the best way to instrument metrics of Spark Application from both Driver and Executor. I am trying to send my Spark application metrics into Kafka. I found two approaches. *Approach 1: * Implement custom Source and Sink and use the Source for instrumenting from both Driver and Executor(By using SparkEnv.metricSystem). *Approach 2:* Write dropwizard/gobblin KafkaReporter and use it for instrumentation from both Driver/Executor Which one will be better approach? I tried to go with Approach 1, but when I launch my application all the containers are getting killed. The steps I did is as below: 1. As there is no KafkaSink from org.apache.spark.metrics.sink, I have implemented my custom KafkaSink and KafkaReporter as suggested in https://github.com/erikerlandson/spark-kafka-sink 2. Implemented SparkMetricsSource by extending org.apache.spark.metrics.source.Source 3. registered the source val sparkMetricsSource = new SparkMetricsSource("spark.xyz.app.prefix") SparkEnv.get.metricsSystem.registerSource(sparkMetricsSource) 4. Instrumented the metrics sparkMetricsSource.registerGauge(sparkEnv.spark.sparkContext.applicationId, schema, "app-start", System.currentTimeMillis) 5. Configured the Sink through spark properties Thanks & Regards, B Anil Kumar.
distributed choleksy on spark?
I'm wondering if anyone has done distributed choleksy decomposition on Spark. I need to do it on a large matrix (200k x 200k) which would not fit on 1 machine. So far I've found: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/CholeskyDecomposition.scala and SystemML, but both seem to be single-node implementations. Any information on the issue is helpful! Thanks. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: can we use mapGroupsWithState in raw sql?
I assume its going to compare by the first column and if equal compare the second column and so on. From: kant kodaliDate: Wednesday, April 18, 2018 at 6:26 PM To: Jungtaek Lim Cc: Arun Iyer , Michael Armbrust , Tathagata Das , "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? This is cool! Looks to me this works too select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group by id) but I got naive question again. what does max of a struct mean? Does it always take the max of the first column and ignore the rest of the fields in the struct? On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim wrote: Thanks Arun, I modified a bit to try my best to avoid enumerating fields: val query = socketDF .selectExpr("CAST(value AS STRING) as value") .as[String] .select(from_json($"value", schema=schema).as("data")) .select($"data.*") .groupBy($"ID") .agg(max(struct($"AMOUNT", $"*")).as("data")) .select($"data.*") .writeStream .format("console") .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .start() It still have a minor issue: the column "AMOUNT" is showing twice in result table, but everything works like a charm. -Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan 님이 작성: The below expr might work: df.groupBy($"id").agg(max(struct($"amount", $"my_timestamp")).as("data")).select($"id", $"data.*") Thanks, Arun From: Jungtaek Lim Date: Wednesday, April 18, 2018 at 4:54 PM To: Michael Armbrust Cc: kant kodali , Arun Iyer , Tathagata Das , "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Thanks Michael for providing great solution. Great to remove UDAF and any needs to provide fields manually. Btw, your code has compilation error. ')' is missing, and after I fix it, it complains again with other issue. :66: error: overloaded method value max with alternatives: (columnName: String)org.apache.spark.sql.Column (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성: You can calculate argmax using a struct. df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).getField("data").select($"data.*") You could transcode this to SQL, it'll just be complicated nested queries. On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: Hi Arun, I want to select the entire row with the max timestamp for each group. I have modified my data set below to avoid any confusion. Input: id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 1 | 6 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan wrote: Cant the “max” function used here ? Something like.. stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. Unless the “stream” is already a grouped stream, in which case the above would not work since the support for multiple aggregate operations is not there yet. Thanks, Arun From: kant kodali Date: Tuesday, April 17, 2018 at 11:41 AM To: Tathagata Das Cc: "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Hi TD, Thanks for that. The only reason I ask is I don't see any alternative solution to solve the problem below using raw sql. How to select the max row for every group in spark structured streaming 2.3.0 without using order by since it requires complete mode or mapGroupWithState? Input: id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 20 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp
Re: can we use mapGroupsWithState in raw sql?
This is cool! Looks to me this works too select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group by id) but I got naive question again. what does max of a struct mean? Does it always take the max of the first column and ignore the rest of the fields in the struct? On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Limwrote: > Thanks Arun, I modified a bit to try my best to avoid enumerating fields: > > val query = socketDF > .selectExpr("CAST(value AS STRING) as value") > .as[String] > .select(from_json($"value", schema=schema).as("data")) > .select($"data.*") > .groupBy($"ID") > .agg(max(struct($"AMOUNT", $"*")).as("data")) > .select($"data.*") > .writeStream > .format("console") > .trigger(Trigger.ProcessingTime("1 seconds")) > .outputMode(OutputMode.Update()) > .start() > > It still have a minor issue: the column "AMOUNT" is showing twice in > result table, but everything works like a charm. > > -Jungtaek Lim (HeartSaVioR) > > 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan 님이 작성: > >> The below expr might work: >> >> df.groupBy($"id").agg(max(struct($"amount", >> $"my_timestamp")).as("data")).select($"id", $"data.*") >> >> >> Thanks, >> Arun >> >> From: Jungtaek Lim >> Date: Wednesday, April 18, 2018 at 4:54 PM >> To: Michael Armbrust >> Cc: kant kodali , Arun Iyer , >> Tathagata Das , "user @spark" < >> user@spark.apache.org> >> >> Subject: Re: can we use mapGroupsWithState in raw sql? >> >> Thanks Michael for providing great solution. Great to remove UDAF and any >> needs to provide fields manually. >> >> Btw, your code has compilation error. ')' is missing, and after I fix it, >> it complains again with other issue. >> >> :66: error: overloaded method value max with alternatives: >> (columnName: String)org.apache.spark.sql.Column >> (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column >> cannot be applied to (org.apache.spark.sql.ColumnName, >> org.apache.spark.sql.Column) >> >> Could you check your code to see it works with Spark 2.3 (via spark-shell >> or whatever)? >> >> Thanks! >> Jungtaek Lim (HeartSaVioR) >> >> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성: >> >>> You can calculate argmax using a struct. >>> >>> df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")). >>> getField("data").select($"data.*") >>> >>> You could transcode this to SQL, it'll just be complicated nested >>> queries. >>> >>> >>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: >>> Hi Arun, I want to select the entire row with the max timestamp for each group. I have modified my data set below to avoid any confusion. *Input:* id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 1 | 6 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z *Expected Output:* id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan wrote: > Cant the “max” function used here ? Something like.. > > stream.groupBy($"id").max("amount").writeStream. > outputMode(“complete”/“update")…. > > Unless the “stream” is already a grouped stream, in which case the > above would not work since the support for multiple aggregate operations > is > not there yet. > > Thanks, > Arun > > From: kant kodali > Date: Tuesday, April 17, 2018 at 11:41 AM > To: Tathagata Das > Cc: "user @spark" > Subject: Re: can we use mapGroupsWithState in raw sql? > > Hi TD, > > Thanks for that. The only reason I ask is I don't see any alternative > solution to solve the problem below using raw sql. > > > How to select the max row for every group in spark structured > streaming 2.3.0 without using order by since it requires complete > mode or mapGroupWithState? > > *Input:* > > id | amount | my_timestamp > --- > 1 | 5 | 2018-04-01T01:00:00.000Z > 1 | 10 | 2018-04-01T01:10:00.000Z > 2 | 20 | 2018-04-01T01:20:00.000Z > 2 | 30 | 2018-04-01T01:25:00.000Z > 2 | 40 |
Re: can we use mapGroupsWithState in raw sql?
Thanks Arun, I modified a bit to try my best to avoid enumerating fields: val query = socketDF .selectExpr("CAST(value AS STRING) as value") .as[String] .select(from_json($"value", schema=schema).as("data")) .select($"data.*") .groupBy($"ID") .agg(max(struct($"AMOUNT", $"*")).as("data")) .select($"data.*") .writeStream .format("console") .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .start() It still have a minor issue: the column "AMOUNT" is showing twice in result table, but everything works like a charm. -Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan님이 작성: > The below expr might work: > > df.groupBy($"id").agg(max(struct($"amount", > $"my_timestamp")).as("data")).select($"id", $"data.*") > > > Thanks, > Arun > > From: Jungtaek Lim > Date: Wednesday, April 18, 2018 at 4:54 PM > To: Michael Armbrust > Cc: kant kodali , Arun Iyer , > Tathagata Das , "user @spark" < > user@spark.apache.org> > > Subject: Re: can we use mapGroupsWithState in raw sql? > > Thanks Michael for providing great solution. Great to remove UDAF and any > needs to provide fields manually. > > Btw, your code has compilation error. ')' is missing, and after I fix it, > it complains again with other issue. > > :66: error: overloaded method value max with alternatives: > (columnName: String)org.apache.spark.sql.Column > (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column > cannot be applied to (org.apache.spark.sql.ColumnName, > org.apache.spark.sql.Column) > > Could you check your code to see it works with Spark 2.3 (via spark-shell > or whatever)? > > Thanks! > Jungtaek Lim (HeartSaVioR) > > 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성: > >> You can calculate argmax using a struct. >> >> df.groupBy($"id").agg(max($"my_timestamp", >> struct($"*").as("data")).getField("data").select($"data.*") >> >> You could transcode this to SQL, it'll just be complicated nested queries. >> >> >> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: >> >>> Hi Arun, >>> >>> I want to select the entire row with the max timestamp for each group. I >>> have modified my data set below to avoid any confusion. >>> >>> *Input:* >>> >>> id | amount | my_timestamp >>> --- >>> 1 | 5 | 2018-04-01T01:00:00.000Z >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 1 | 6 | 2018-04-01T01:20:00.000Z >>> 2 | 30 | 2018-04-01T01:25:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> *Expected Output:* >>> >>> id | amount | my_timestamp >>> --- >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> Looking for a streaming solution using either raw sql like >>> sparkSession.sql("sql >>> query") or similar to raw sql but not something like mapGroupWithState >>> >>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan >>> wrote: >>> Cant the “max” function used here ? Something like.. stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. Unless the “stream” is already a grouped stream, in which case the above would not work since the support for multiple aggregate operations is not there yet. Thanks, Arun From: kant kodali Date: Tuesday, April 17, 2018 at 11:41 AM To: Tathagata Das Cc: "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Hi TD, Thanks for that. The only reason I ask is I don't see any alternative solution to solve the problem below using raw sql. How to select the max row for every group in spark structured streaming 2.3.0 without using order by since it requires complete mode or mapGroupWithState? *Input:* id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 20 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z *Expected Output:* id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < tathagata.das1...@gmail.com> wrote: >
Re: can we use mapGroupsWithState in raw sql?
The below expr might work: df.groupBy($"id").agg(max(struct($"amount", $"my_timestamp")).as("data")).select($"id", $"data.*") Thanks, Arun From: Jungtaek LimDate: Wednesday, April 18, 2018 at 4:54 PM To: Michael Armbrust Cc: kant kodali , Arun Iyer , Tathagata Das , "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Thanks Michael for providing great solution. Great to remove UDAF and any needs to provide fields manually. Btw, your code has compilation error. ')' is missing, and after I fix it, it complains again with other issue. :66: error: overloaded method value max with alternatives: (columnName: String)org.apache.spark.sql.Column (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust 님이 작성: You can calculate argmax using a struct. df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).getField("data").select($"data.*") You could transcode this to SQL, it'll just be complicated nested queries. On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: Hi Arun, I want to select the entire row with the max timestamp for each group. I have modified my data set below to avoid any confusion. Input: id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 1 | 6 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan wrote: Cant the “max” function used here ? Something like.. stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. Unless the “stream” is already a grouped stream, in which case the above would not work since the support for multiple aggregate operations is not there yet. Thanks, Arun From: kant kodali Date: Tuesday, April 17, 2018 at 11:41 AM To: Tathagata Das Cc: "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Hi TD, Thanks for that. The only reason I ask is I don't see any alternative solution to solve the problem below using raw sql. How to select the max row for every group in spark structured streaming 2.3.0 without using order by since it requires complete mode or mapGroupWithState? Input: id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 20 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das wrote: Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: Hi All, can we use mapGroupsWithState in raw SQL? or is it in the roadmap? Thanks!
Re: can we use mapGroupsWithState in raw sql?
Thanks Michael for providing great solution. Great to remove UDAF and any needs to provide fields manually. Btw, your code has compilation error. ')' is missing, and after I fix it, it complains again with other issue. :66: error: overloaded method value max with alternatives: (columnName: String)org.apache.spark.sql.Column (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust님이 작성: > You can calculate argmax using a struct. > > df.groupBy($"id").agg(max($"my_timestamp", > struct($"*").as("data")).getField("data").select($"data.*") > > You could transcode this to SQL, it'll just be complicated nested queries. > > > On Wed, Apr 18, 2018 at 3:40 PM, kant kodali wrote: > >> Hi Arun, >> >> I want to select the entire row with the max timestamp for each group. I >> have modified my data set below to avoid any confusion. >> >> *Input:* >> >> id | amount | my_timestamp >> --- >> 1 | 5 | 2018-04-01T01:00:00.000Z >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 1 | 6 | 2018-04-01T01:20:00.000Z >> 2 | 30 | 2018-04-01T01:25:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> *Expected Output:* >> >> id | amount | my_timestamp >> --- >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> Looking for a streaming solution using either raw sql like >> sparkSession.sql("sql >> query") or similar to raw sql but not something like mapGroupWithState >> >> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan wrote: >> >>> Cant the “max” function used here ? Something like.. >>> >>> >>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. >>> >>> Unless the “stream” is already a grouped stream, in which case the above >>> would not work since the support for multiple aggregate operations is not >>> there yet. >>> >>> Thanks, >>> Arun >>> >>> From: kant kodali >>> Date: Tuesday, April 17, 2018 at 11:41 AM >>> To: Tathagata Das >>> Cc: "user @spark" >>> Subject: Re: can we use mapGroupsWithState in raw sql? >>> >>> Hi TD, >>> >>> Thanks for that. The only reason I ask is I don't see any alternative >>> solution to solve the problem below using raw sql. >>> >>> >>> How to select the max row for every group in spark structured streaming >>> 2.3.0 without using order by since it requires complete mode or >>> mapGroupWithState? >>> >>> *Input:* >>> >>> id | amount | my_timestamp >>> --- >>> 1 | 5 | 2018-04-01T01:00:00.000Z >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 20 | 2018-04-01T01:20:00.000Z >>> 2 | 30 | 2018-04-01T01:25:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> *Expected Output:* >>> >>> id | amount | my_timestamp >>> --- >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> Looking for a streaming solution using either raw sql like >>> sparkSession.sql("sql >>> query") or similar to raw sql but not something like mapGroupWithState >>> >>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: > Hi All, > > can we use mapGroupsWithState in raw SQL? or is it in the roadmap? > > Thanks! > > > >>> >> >
Re: can we use mapGroupsWithState in raw sql?
You can calculate argmax using a struct. df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).getField("data").select($"data.*") You could transcode this to SQL, it'll just be complicated nested queries. On Wed, Apr 18, 2018 at 3:40 PM, kant kodaliwrote: > Hi Arun, > > I want to select the entire row with the max timestamp for each group. I > have modified my data set below to avoid any confusion. > > *Input:* > > id | amount | my_timestamp > --- > 1 | 5 | 2018-04-01T01:00:00.000Z > 1 | 10 | 2018-04-01T01:10:00.000Z > 1 | 6 | 2018-04-01T01:20:00.000Z > 2 | 30 | 2018-04-01T01:25:00.000Z > 2 | 40 | 2018-04-01T01:30:00.000Z > > *Expected Output:* > > id | amount | my_timestamp > --- > 1 | 10 | 2018-04-01T01:10:00.000Z > 2 | 40 | 2018-04-01T01:30:00.000Z > > Looking for a streaming solution using either raw sql like > sparkSession.sql("sql > query") or similar to raw sql but not something like mapGroupWithState > > On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan wrote: > >> Cant the “max” function used here ? Something like.. >> >> stream.groupBy($"id").max("amount").writeStream.outputMode(“ >> complete”/“update")…. >> >> Unless the “stream” is already a grouped stream, in which case the above >> would not work since the support for multiple aggregate operations is not >> there yet. >> >> Thanks, >> Arun >> >> From: kant kodali >> Date: Tuesday, April 17, 2018 at 11:41 AM >> To: Tathagata Das >> Cc: "user @spark" >> Subject: Re: can we use mapGroupsWithState in raw sql? >> >> Hi TD, >> >> Thanks for that. The only reason I ask is I don't see any alternative >> solution to solve the problem below using raw sql. >> >> >> How to select the max row for every group in spark structured streaming >> 2.3.0 without using order by since it requires complete mode or >> mapGroupWithState? >> >> *Input:* >> >> id | amount | my_timestamp >> --- >> 1 | 5 | 2018-04-01T01:00:00.000Z >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 2 | 20 | 2018-04-01T01:20:00.000Z >> 2 | 30 | 2018-04-01T01:25:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> *Expected Output:* >> >> id | amount | my_timestamp >> --- >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> Looking for a streaming solution using either raw sql like >> sparkSession.sql("sql >> query") or similar to raw sql but not something like mapGroupWithState >> >> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Unfortunately no. Honestly it does not make sense as for type-aware >>> operations like map, mapGroups, etc., you have to provide an actual JVM >>> function. That does not fit in with the SQL language structure. >>> >>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: >>> Hi All, can we use mapGroupsWithState in raw SQL? or is it in the roadmap? Thanks! >>> >> >
Re: can we use mapGroupsWithState in raw sql?
Hi Arun, I want to select the entire row with the max timestamp for each group. I have modified my data set below to avoid any confusion. *Input:* id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 1 | 6 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z *Expected Output:* id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevanwrote: > Cant the “max” function used here ? Something like.. > > stream.groupBy($"id").max("amount").writeStream. > outputMode(“complete”/“update")…. > > Unless the “stream” is already a grouped stream, in which case the above > would not work since the support for multiple aggregate operations is not > there yet. > > Thanks, > Arun > > From: kant kodali > Date: Tuesday, April 17, 2018 at 11:41 AM > To: Tathagata Das > Cc: "user @spark" > Subject: Re: can we use mapGroupsWithState in raw sql? > > Hi TD, > > Thanks for that. The only reason I ask is I don't see any alternative > solution to solve the problem below using raw sql. > > > How to select the max row for every group in spark structured streaming > 2.3.0 without using order by since it requires complete mode or > mapGroupWithState? > > *Input:* > > id | amount | my_timestamp > --- > 1 | 5 | 2018-04-01T01:00:00.000Z > 1 | 10 | 2018-04-01T01:10:00.000Z > 2 | 20 | 2018-04-01T01:20:00.000Z > 2 | 30 | 2018-04-01T01:25:00.000Z > 2 | 40 | 2018-04-01T01:30:00.000Z > > *Expected Output:* > > id | amount | my_timestamp > --- > 1 | 10 | 2018-04-01T01:10:00.000Z > 2 | 40 | 2018-04-01T01:30:00.000Z > > Looking for a streaming solution using either raw sql like > sparkSession.sql("sql > query") or similar to raw sql but not something like mapGroupWithState > > On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Unfortunately no. Honestly it does not make sense as for type-aware >> operations like map, mapGroups, etc., you have to provide an actual JVM >> function. That does not fit in with the SQL language structure. >> >> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: >> >>> Hi All, >>> >>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap? >>> >>> Thanks! >>> >>> >>> >> >
Re: can we use mapGroupsWithState in raw sql?
Cant the “max” function used here ? Something like.. stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. Unless the “stream” is already a grouped stream, in which case the above would not work since the support for multiple aggregate operations is not there yet. Thanks, Arun From: kant kodaliDate: Tuesday, April 17, 2018 at 11:41 AM To: Tathagata Das Cc: "user @spark" Subject: Re: can we use mapGroupsWithState in raw sql? Hi TD, Thanks for that. The only reason I ask is I don't see any alternative solution to solve the problem below using raw sql. How to select the max row for every group in spark structured streaming 2.3.0 without using order by since it requires complete mode or mapGroupWithState? Input: id | amount | my_timestamp --- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 20 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp --- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das wrote: Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali wrote: Hi All, can we use mapGroupsWithState in raw SQL? or is it in the roadmap? Thanks!
[Spark 2.3] GLM Poisson issue
Has any one ran Poisson GLM model and got the GeneralizedLinearRegressionTrainingSummary object (to access p, t-values, deviances ,aic etc.,) successfully? I have tried to fit two datasets to compare Spark vs R outputs, both models ran fine in Spark and i was able to get the coefficients back. But when i tried to get the object "GeneralizedLinearRegressionTrainingSummary" i always get the following error java.lang.NumberFormatException at java.math.BigDecimal.(BigDecimal.java:494) at java.math.BigDecimal.(BigDecimal.java:824) at scala.math.BigDecimal$.decimal(BigDecimal.scala:52) at scala.math.BigDecimal$.apply(BigDecimal.scala:249) at org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary.org$apache$spark$ml$regression$GeneralizedLinearRegressionTrainingSummary$$round$1(GeneralizedLinearRegression.scala:1503) at org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary.toString(GeneralizedLinearRegression.scala:1551) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337) Looking into Spark 2.3 source code for GeneralizedLinearRegressionTrainingSummary's toString method throws error at rounding the deviances. For some reason the deviances are NaN. I have ran the same model in spark 2.2.0, i can get the GeneralizedLinearRegressionTrainingSummary object fine(slightly different code than Spark 2.3) but the deviances are also NaN. Does any one know why the deviances are NaN? Thanks, Srikar.V -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Why doesn't spark use broadcast join?
Try running AnalyzeTableCommand on both tables first. On Wed, Apr 18, 2018 at 2:57 AM Matteo Cossuwrote: > Can you check the value for spark.sql.autoBroadcastJoinThreshold? > > On 29 March 2018 at 14:41, Vitaliy Pisarev > wrote: > >> I am looking at the physical plan for the following query: >> >> SELECT f1,f2,f3,... >> FROM T1 >> LEFT ANTI JOIN T2 ON T1.id = T2.id >> WHERE f1 = 'bla' >>AND f2 = 'bla2' >>AND some_date >= date_sub(current_date(), 1) >> LIMIT 100 >> >> An important detail: the table 'T1' can be very large (hundreds of >> thousands of rows), but table T2 is rather small. Maximun in the thousands. >> In this particular case, the table T2 has 2 rows. >> >> In the physical plan, I see that a SortMergeJoin is performed. Despite it >> being the perfect candidate for a broadcast join. >> >> What could be the reason for this? >> Is there a way to hint the optimizer to perform a broadcast join in the >> sql syntax? >> >> I am writing this in pyspark and the query itself is over parquets stored >> in Azure blob storage. >> >> >> >
Re: Why doesn't spark use broadcast join?
Can you check the value for spark.sql.autoBroadcastJoinThreshold? On 29 March 2018 at 14:41, Vitaliy Pisarevwrote: > I am looking at the physical plan for the following query: > > SELECT f1,f2,f3,... > FROM T1 > LEFT ANTI JOIN T2 ON T1.id = T2.id > WHERE f1 = 'bla' >AND f2 = 'bla2' >AND some_date >= date_sub(current_date(), 1) > LIMIT 100 > > An important detail: the table 'T1' can be very large (hundreds of > thousands of rows), but table T2 is rather small. Maximun in the thousands. > In this particular case, the table T2 has 2 rows. > > In the physical plan, I see that a SortMergeJoin is performed. Despite it > being the perfect candidate for a broadcast join. > > What could be the reason for this? > Is there a way to hint the optimizer to perform a broadcast join in the > sql syntax? > > I am writing this in pyspark and the query itself is over parquets stored > in Azure blob storage. > > >
Unsubscribe
"not in" sql spend a lot of time
Hi, when I execute sql like that: "select * from onlineDevice where deviceId not in (select deviceId from historyDevice)") I found the task spend a lot of time(over 40 min),I stopped the task but I can't found the reason from spark history UI. the historyDevice and onlineDevice only contain about 3 millions of records spark-submit : --master yarn --deploy-mode client --driver-memory 8G --num-executors 2 --executor-memory 9G --executor-cores 6