64GB in parquet could be many billions of rows because of the columnar
compression. And count distinct by itself is an expensive operation. This
is not just on Spark, even on Presto/Impala, you would see performance dip
with count distincts. And the cluster is not that powerful either.

The one issue here is that Spark has to sift through all the data to get to
just a week's worth. To achieve better performance you might want to
partition the data by date/week and then Spark wouldn't have to sift
through all the billions of rows to get to the millions it needs to
aggregate.

Regards
Sab

On Tue, Jun 23, 2015 at 4:35 PM, James Aley <james.a...@swiftkey.com> wrote:

> Thanks for the suggestions everyone, appreciate the advice.
>
> I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
> of 1.3, replacing the date casts with a "between" operation on the
> corresponding long constants instead and changing COUNT(*) to COUNT(1).
> None of these seem to have made any remarkable difference in running time
> for the query.
>
> I'll hook up YourKit and see if we can figure out where the CPU time is
> going, then post back.
>
> On 22 June 2015 at 16:01, Yin Huai <yh...@databricks.com> wrote:
>
>> Hi James,
>>
>> Maybe it's the DISTINCT causing the issue.
>>
>> I rewrote the query as follows. Maybe this one can finish faster.
>>
>> select
>>   sum(cnt) as uses,
>>   count(id) as users
>> from (
>>   select
>>     count(*) cnt,
>>     cast(id as string) as id,
>>   from usage_events
>>   where
>>     from_unixtime(cast(timestamp_millis/1000 as bigint)) between
>> '2015-06-09' and '2015-06-16'
>>   group by cast(id as string)
>> ) tmp
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke <jornfra...@gmail.com>
>> wrote:
>>
>>> Generally (not only spark sql specific) you should not cast in the where
>>> part of a sql query. It is also not necessary in your case. Getting rid of
>>> casts in the whole query will be also beneficial.
>>>
>>> Le lun. 22 juin 2015 à 17:29, James Aley <james.a...@swiftkey.com> a
>>> écrit :
>>>
>>>> Hello,
>>>>
>>>> A colleague of mine ran the following Spark SQL query:
>>>>
>>>> select
>>>>   count(*) as uses,
>>>>   count (distinct cast(id as string)) as users
>>>> from usage_events
>>>> where
>>>>   from_unixtime(cast(timestamp_millis/1000 as bigint))
>>>> between '2015-06-09' and '2015-06-16'
>>>>
>>>> The table contains billions of rows, but totals only 64GB of data
>>>> across ~30 separate files, which are stored as Parquet with LZO compression
>>>> in S3.
>>>>
>>>> From the referenced columns:
>>>>
>>>> * id is Binary, which we cast to a String so that we can DISTINCT by
>>>> it. (I was already told this will improve in a later release, in a separate
>>>> thread.)
>>>> * timestamp_millis is a long, containing a unix timestamp with
>>>> millisecond resolution
>>>>
>>>> This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
>>>> instances, using 20 executors, each with 4GB memory. I can see from
>>>> monitoring tools that the CPU usage is at 100% on all nodes, but incoming
>>>> network seems a bit low at 2.5MB/s, suggesting to me that this is 
>>>> CPU-bound.
>>>>
>>>> Does that seem slow? Can anyone offer any ideas by glancing at the
>>>> query as to why this might be slow? We'll profile it meanwhile and post
>>>> back if we find anything ourselves.
>>>>
>>>> A side issue - I've found that this query, and others, sometimes
>>>> completes but doesn't return any results. There appears to be no error that
>>>> I can see in the logs, and Spark reports the job as successful, but the
>>>> connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
>>>> waiting. I did a quick Google and couldn't find anyone else having similar
>>>> issues.
>>>>
>>>>
>>>> Many thanks,
>>>>
>>>> James.
>>>>
>>>
>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++

Reply via email to