thanks Michael, That worked. But what's puzzling is if I take the exact same code and run it off a temp table created from parquet, vs. a cached table - it runs much slower. 5-10 seconds uncached vs. 47-60 seconds cached.
Any ideas why? Here's my code snippet: df = data.select("customer_id", struct('dt', 'product').alias("vs"))\ .groupBy("customer_id")\ .agg(min("vs").alias("final"))\ .select("customer_id", "final.dt", "final.product") df.head() My log from the non-cached run: http://pastebin.com/F88sSv1B Log from the cached run: http://pastebin.com/Pmmfea3d thanks, imran On Fri, Apr 8, 2016 at 12:33 PM, Michael Armbrust <mich...@databricks.com> wrote: > You need to use the struct function > <https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.functions.struct> > (which creates an actual struct), you are trying to use the struct datatype > (which just represents the schema of a struct). > > On Thu, Apr 7, 2016 at 3:48 PM, Imran Akbar <skunkw...@gmail.com> wrote: > >> thanks Michael, >> >> >> I'm trying to implement the code in pyspark like so (where my dataframe >> has 3 columns - customer_id, dt, and product): >> >> st = StructType().add("dt", DateType(), True).add("product", >> StringType(), True) >> >> top = data.select("customer_id", st.alias('vs')) >> .groupBy("customer_id") >> .agg(max("dt").alias("vs")) >> .select("customer_id", "vs.dt", "vs.product") >> >> But I get an error saying: >> >> AttributeError: 'StructType' object has no attribute 'alias' >> >> Can I do this without aliasing the struct? Or am I doing something >> incorrectly? >> >> >> regards, >> >> imran >> >> On Wed, Apr 6, 2016 at 4:16 PM, Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> Ordering for a struct goes in order of the fields. So the max struct is >>>> the one with the highest TotalValue (and then the highest category >>>> if there are multiple entries with the same hour and total value). >>>> >>>> Is this due to "InterpretedOrdering" in StructType? >>>> >>> >>> That is one implementation, but the code generated ordering also follows >>> the same contract. >>> >>> >>> >>>> 4) Is it faster doing it this way than doing a join or window >>>> function in Spark SQL? >>>> >>>> Way faster. This is a very efficient way to calculate argmax. >>>> >>>> Can you explain how this is way faster than window function? I can >>>> understand join doesn't make sense in this case. But to calculate the >>>> grouping max, you just have to shuffle the data by grouping keys. You maybe >>>> can do a combiner on the mapper side before shuffling, but that is it. Do >>>> you mean windowing function in Spark SQL won't do any map side combiner, >>>> even it is for max? >>>> >>> >>> Windowing can't do partial aggregation and will have to collect all the >>> data for a group so that it can be sorted before applying the function. In >>> contrast a max aggregation will do partial aggregation (map side combining) >>> and can be calculated in a streaming fashion. >>> >>> Also, aggregation is more common and thus has seen more optimization >>> beyond the theoretical limits described above. >>> >>> >> >