Hi Debashish - No, there are actually 14 columns any of which can be specified at runtime by the user. There is a UI which allows the user to specify predicates on any of the 14 columns. They press submit this form and we generate a filter like below;
val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0' and cs < 700 and state = 'NY' and occ_status = 'Y' and prop_type = 'MF' and balance > 250000") Then we group/aggregate/collect based on that filter (and hopefully get an answer back in under a second). We can't preaggregate the data because of the combinatorial explosion. When I get back to the office I'll get you the numbers for the uncached, partitioned queries. I remember that for a partitioned Parquet table of the full dataset it is 3.5 sec using 20 cores. Thanks, On Tuesday, June 30, 2015, Debasish Das <debasish.da...@gmail.com <javascript:_e(%7B%7D,'cvml','debasish.da...@gmail.com');>> wrote: > You have 4 columns right...You should be able to extract 3 KV structure: > (product, coupon), (product, cs) and (product, balance)...Now map your SQL > to a API calls and let the API combine the results from 3 KV scans to > construct the result...I agree that as this become more complicated you > will move from KV store to Spark SQL... > > You should be able to partition the dataset through HiveContext and I > think even Spark 1.3/1.4 supports writing/reading through the partitioned > table...but I did not try it yet to see performance impact...documentation > says Spark SQL should read partitioned table... > > Could you please share your results with partitioned tables ? > > On Tue, Jun 30, 2015 at 5:24 AM, Eric Pederson <eric...@gmail.com> wrote: > >> Hi Deb - >> >> One other consideration is that the filter will always contain two key >> columns (product and coupon) but will also contain other columns. For >> example, instead of >> >> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0'") >> >> >> the user could ask for: >> >> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0' and cs < >> 700 and balance > 250000") >> >> >> The filter conditions are supplied by the user in a UI. So the number of >> possible combinations is way too large to, as you suggest, pre-aggregate >> and query against the pre-aggregated data. Because of this we'll always >> have to scan through all of the data. >> >> I think we might be able to throw more machines at the problem now. >> Previously the time flattened out around 10 cores (ie, 20, 30 cores made no >> improvement). But that was before I reduced spark.sql.shuffle.partitions. >> >> >> This brings up another question/issue - there doesn't seem to be a way to >> partition cached tables in the same way you can partition, say a Hive >> table. For example, we would like to partition the overall dataset (233m >> rows, 9.2Gb) by (product, coupon) so when we run one of these queries >> Spark won't have to scan all the data, just the partition from the query, >> eg, (FNM30, 3.0). >> >> Thanks! >> >> -- Eric >> >> On Mon, Jun 29, 2015 at 11:29 PM, Debasish Das <debasish.da...@gmail.com> >> wrote: >> >>> Eric, >>> >>> We are discussed use-case like these over here and the dataset sizes are >>> similar to what you mentioned. >>> >>> I could extract 3 second out of Spark SQL but with Spark IndexedRDD we >>> can get to ~ 100 ms domain. But I feel that Memcached/HBase will take us to >>> 10 ms domain if you can extract they key-value structure out of your SQL >>> Rows. Basically your groupBy flow will generate the data that you will put >>> on a kv store. >>> >>> https://github.com/amplab/spark-indexedrdd/issues/5 >>> >>> I will update the issue further as I finish the comparisons. >>> >>> I feel to serve the models in a meaningful way, we need Spark Job access >>> (perhaps through RDD and IndexedRDD) and a key-value based solution. I am >>> not yet sure whether for serving models, do we need a write optimized >>> solution like HBase/Cassandra or read optimized Memcached/Redis is >>> sufficient. >>> >>> Thank. >>> Deb >>> >>> On Mon, Jun 29, 2015 at 5:57 PM, Eric Pederson <eric...@gmail.com> >>> wrote: >>> >>>> Hi Ilya - >>>> >>>> I will try that - thanks. However, I want to aggregate across the >>>> dataset, not do lookups. Does your advice still apply? >>>> >>>> >>>> >>>> -- Eric >>>> >>>> On Mon, Jun 29, 2015 at 8:19 PM, Ganelin, Ilya < >>>> ilya.gane...@capitalone.com> wrote: >>>> >>>>> Store the table in memory as a broadcast variable. It's small enough >>>>> so it doesn't need to be an RDD. This will get you O(1) lookups. >>>>> >>>>> >>>>> >>>>> Thank you, >>>>> Ilya Ganelin >>>>> >>>>> >>>>> >>>>> -----Original Message----- >>>>> *From: *Eric Pederson [eric...@gmail.com] >>>>> *Sent: *Monday, June 29, 2015 07:03 PM Eastern Standard Time >>>>> *To: *user@spark.apache.org >>>>> *Subject: *Subsecond queries possible? >>>>> >>>>> Dear all: >>>>> >>>>> In the query below, the spp table is 233m rows and the >>>>> filtered/cached dataframe is about 15m rows. The cached size is 652mb >>>>> across 7 partitions - which was the default (see screenshots below). >>>>> >>>>> I am running the query on a cluster of 3 12-core machines with 196Gb >>>>> ram each. The Spark app has 20 cores/5GB per node assigned. It takes >>>>> roughly 2.5s to run these queries, even though it is fully cached. >>>>> >>>>> I tried repartitioning so that the # of partitions matches the number >>>>> of cores, assuming that since it's cached in memory then more cores would >>>>> make it faster. But actually using 20 partitions is about 2x slower. 7 >>>>> seems to be the best. >>>>> >>>>> Is it possible to get queries like this running in sub-second time in >>>>> Spark? >>>>> >>>>> >>>>> >>>>> val sql = new org.apache.spark.sql.hive.HiveContext(sc) >>>>> >>>>> val spp = sql.table("spp") >>>>> >>>>> val fnm303 = spp.filter("product = 'FNM30' and coupon = '3.0'") >>>>> >>>>> val fnm303c = fnm303.cache() >>>>> >>>>> >>>>> val fnm303m = fnm303c.groupBy("month") >>>>> >>>>> val fnm303sum = fnm303m.sum("balance", "curtailment", "prepayment", >>>>> "scheduled_bal") >>>>> >>>>> >>>>> // This takes about 2.5s after it is cached >>>>> >>>>> fnm303sum.collect().foreach(println) >>>>> >>>>> >>>>> The best performance I've got on this dataset is about 1.5s using >>>>> Drill with a Parquet-based table and partition pruning. >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> >>>>> [image: Inline image 2] >>>>> [image: Inline image 3] >>>>> [image: Inline image 4] >>>>> -- Eric >>>>> >>>>> ------------------------------ >>>>> >>>>> The information contained in this e-mail is confidential and/or >>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>> solely in performance of work or services for Capital One. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> recipient, you are hereby notified that any review, retransmission, >>>>> dissemination, distribution, copying or other use of, or taking of any >>>>> action in reliance upon this information is strictly prohibited. If you >>>>> have received this communication in error, please contact the sender and >>>>> delete the material from your computer. >>>>> >>>> >>>> >>> >> > -- Sent from Gmail Mobile