Hi Debashish -

No, there are actually 14 columns any of which can be specified at runtime
by the user.  There is a UI which allows the user to specify predicates on
any of the 14 columns.  They press submit this form and we generate
a filter like below;

val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0' and cs < 700
and state = 'NY' and occ_status = 'Y' and prop_type = 'MF' and balance >
250000")


Then we group/aggregate/collect based on that filter (and hopefully get an
answer back in under a second).  We can't preaggregate the data because of
the combinatorial explosion.

When I get back to the office I'll get you the numbers for the uncached,
partitioned queries.  I remember that for a partitioned Parquet table of
the full dataset it is 3.5 sec using 20 cores.

Thanks,

On Tuesday, June 30, 2015, Debasish Das <debasish.da...@gmail.com
<javascript:_e(%7B%7D,'cvml','debasish.da...@gmail.com');>> wrote:

> You have 4 columns right...You should be able to extract 3 KV structure:
> (product, coupon), (product, cs) and (product, balance)...Now map your SQL
> to a API calls and let the API combine the results from 3 KV scans to
> construct the result...I agree that as this become more complicated you
> will move from KV store to Spark SQL...
>
> You should be able to partition the dataset through HiveContext and I
> think even Spark 1.3/1.4 supports writing/reading through the partitioned
> table...but I did not try it yet to see performance impact...documentation
> says Spark SQL should read partitioned table...
>
> Could you please share your results with partitioned tables ?
>
> On Tue, Jun 30, 2015 at 5:24 AM, Eric Pederson <eric...@gmail.com> wrote:
>
>> Hi Deb -
>>
>> One other consideration is that the filter will always contain two key
>> columns (product and coupon) but will also contain other columns.  For
>> example, instead of
>>
>> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0'")
>>
>>
>> the user could ask for:
>>
>> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0' and cs <
>> 700 and balance > 250000")
>>
>>
>> The filter conditions are supplied by the user in a UI.  So the number of
>> possible combinations is way too large to, as you suggest, pre-aggregate
>> and query against the pre-aggregated data.   Because of this we'll always
>> have to scan through all of the data.
>>
>> I think we might be able to throw more machines at the problem now.
>> Previously the time flattened out around 10 cores (ie, 20, 30 cores made no
>> improvement).  But that was before I reduced spark.sql.shuffle.partitions.
>>
>>
>> This brings up another question/issue - there doesn't seem to be a way to
>> partition cached tables in the same way you can partition, say a Hive
>> table.  For example, we would like to partition the overall dataset (233m
>> rows, 9.2Gb) by (product, coupon) so when we run one of these queries
>> Spark won't have to scan all the data, just the partition from the query,
>> eg, (FNM30, 3.0).
>>
>> Thanks!
>>
>> -- Eric
>>
>> On Mon, Jun 29, 2015 at 11:29 PM, Debasish Das <debasish.da...@gmail.com>
>> wrote:
>>
>>> Eric,
>>>
>>> We are discussed use-case like these over here and the dataset sizes are
>>> similar to what you mentioned.
>>>
>>> I could extract 3 second out of Spark SQL but with Spark IndexedRDD we
>>> can get to ~ 100 ms domain. But I feel that Memcached/HBase will take us to
>>> 10 ms domain if you can extract they key-value structure out of your SQL
>>> Rows. Basically your groupBy flow will generate the data that you will put
>>> on a kv store.
>>>
>>> https://github.com/amplab/spark-indexedrdd/issues/5
>>>
>>> I will update the issue further as I finish the comparisons.
>>>
>>> I feel to serve the models in a meaningful way, we need Spark Job access
>>> (perhaps through RDD and IndexedRDD) and a key-value based solution. I am
>>> not yet sure whether for serving models, do we need a write optimized
>>> solution like HBase/Cassandra or read optimized Memcached/Redis is
>>> sufficient.
>>>
>>> Thank.
>>> Deb
>>>
>>> On Mon, Jun 29, 2015 at 5:57 PM, Eric Pederson <eric...@gmail.com>
>>> wrote:
>>>
>>>> Hi Ilya -
>>>>
>>>> I will try that - thanks.  However, I want to aggregate across the
>>>> dataset, not do lookups.   Does your advice still apply?
>>>>
>>>>
>>>>
>>>> -- Eric
>>>>
>>>> On Mon, Jun 29, 2015 at 8:19 PM, Ganelin, Ilya <
>>>> ilya.gane...@capitalone.com> wrote:
>>>>
>>>>>  Store the table in memory as a broadcast variable. It's small enough
>>>>> so it doesn't need to be an RDD. This will get you O(1) lookups.
>>>>>
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Ilya Ganelin
>>>>>
>>>>>
>>>>>
>>>>> -----Original Message-----
>>>>> *From: *Eric Pederson [eric...@gmail.com]
>>>>> *Sent: *Monday, June 29, 2015 07:03 PM Eastern Standard Time
>>>>> *To: *user@spark.apache.org
>>>>> *Subject: *Subsecond queries possible?
>>>>>
>>>>>  Dear all:
>>>>>
>>>>> In the query below, the spp table is 233m rows and the
>>>>> filtered/cached dataframe is about 15m rows.  The cached size is 652mb
>>>>> across 7 partitions - which was the default (see screenshots below).
>>>>>
>>>>> I am running the query on a cluster of 3 12-core machines with 196Gb
>>>>> ram each.   The Spark app has 20 cores/5GB per node assigned.   It takes
>>>>> roughly 2.5s to run these queries, even though it is fully cached.
>>>>>
>>>>> I tried repartitioning so that the # of partitions matches the number
>>>>> of cores, assuming that since it's cached in memory then more cores would
>>>>> make it faster.  But actually using 20 partitions is about 2x slower.   7
>>>>> seems to be the best.
>>>>>
>>>>> Is it possible to get queries like this running in sub-second time in
>>>>> Spark?
>>>>>
>>>>>
>>>>>
>>>>>  val sql = new org.apache.spark.sql.hive.HiveContext(sc)
>>>>>
>>>>> val spp = sql.table("spp")
>>>>>
>>>>> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0'")
>>>>>
>>>>> val fnm303c = fnm303.cache()
>>>>>
>>>>>
>>>>>   val fnm303m = fnm303c.groupBy("month")
>>>>>
>>>>> val fnm303sum = fnm303m.sum("balance", "curtailment", "prepayment",
>>>>> "scheduled_bal")
>>>>>
>>>>>
>>>>>   // This takes about 2.5s after it is cached
>>>>>
>>>>> fnm303sum.collect().foreach(println)
>>>>>
>>>>>
>>>>> The best performance I've got on this dataset is about 1.5s using
>>>>> Drill with a Parquet-based table and partition pruning.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>>
>>>>> [image: Inline image 2]
>>>>>   [image: Inline image 3]
>>>>> [image: Inline image 4]
>>>>> -- Eric
>>>>>
>>>>> ------------------------------
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the 
>>>>> intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>
>>>>
>>>
>>
>

-- 
Sent from Gmail Mobile

Reply via email to