Hi Manoj,

Yes, you've already hit the point. I think timestamp type support in the
in-memory columnar support can be a good reference for you. Also, you may
want to enable compression support for decimal type by adding DECIMAL
column type to RunLengthEncoding.supports and DictionaryEncoding.supports.
Thanks for working on this!

Best,
Cheng

On Sat, Feb 14, 2015 at 5:32 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> That sound right to me.  Cheng could elaborate if you are missing
> something.
>
> On Fri, Feb 13, 2015 at 11:36 AM, Manoj Samel <manojsamelt...@gmail.com>
> wrote:
>
>> Thanks Michael for the pointer & Sorry for the delayed reply.
>>
>> Taking a quick inventory of scope of change - Is the column type for
>> Decimal caching needed only in the caching layer (4 files
>> in org.apache.spark.sql.columnar - ColumnAccessor.scala,
>> ColumnBuilder.scala, ColumnStats.scala, ColumnType.scala)
>>
>> Or do other SQL components also need to be touched ?
>>
>> Hoping for a quick feedback of top of your head ...
>>
>> Thanks,
>>
>>
>>
>> On Mon, Feb 9, 2015 at 3:16 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> You could add a new ColumnType
>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala>
>>> .
>>>
>>> PRs welcome :)
>>>
>>> On Mon, Feb 9, 2015 at 3:01 PM, Manoj Samel <manojsamelt...@gmail.com>
>>> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> As a test, I have same data loaded as another parquet - except with the
>>>> 2 decimal(14,4) replaced by double. With this, the  on disk size is ~345MB,
>>>> the in-memory size is 2GB (v.s. 12 GB) and the cached query runs in 1/2 the
>>>> time of uncached query.
>>>>
>>>> Would it be possible for Spark to store in-memory decimal in some form
>>>> of long with decoration ?
>>>>
>>>> For the immediate future, is there any hook that we can use to provide
>>>> custom caching / processing for the decimal type in RDD so other semantic
>>>> does not changes ?
>>>>
>>>> Thanks,
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Feb 9, 2015 at 2:41 PM, Manoj Samel <manojsamelt...@gmail.com>
>>>> wrote:
>>>>
>>>>> Could you share which data types are optimized in the in-memory
>>>>> storage and how are they optimized ?
>>>>>
>>>>> On Mon, Feb 9, 2015 at 2:33 PM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> You'll probably only get good compression for strings when dictionary
>>>>>> encoding works.  We don't optimize decimals in the in-memory columnar
>>>>>> storage, so you are paying expensive serialization there likely.
>>>>>>
>>>>>> On Mon, Feb 9, 2015 at 2:18 PM, Manoj Samel <manojsamelt...@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Flat data of types String, Int and couple of decimal(14,4)
>>>>>>>
>>>>>>> On Mon, Feb 9, 2015 at 1:58 PM, Michael Armbrust <
>>>>>>> mich...@databricks.com> wrote:
>>>>>>>
>>>>>>>> Is this nested data or flat data?
>>>>>>>>
>>>>>>>> On Mon, Feb 9, 2015 at 1:53 PM, Manoj Samel <
>>>>>>>> manojsamelt...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Michael,
>>>>>>>>>
>>>>>>>>> The storage tab shows the RDD resides fully in memory (10
>>>>>>>>> partitions) with zero disk usage. Tasks for subsequent select on this 
>>>>>>>>> table
>>>>>>>>> in cache shows minimal overheads (GC, queueing, shuffle write etc. 
>>>>>>>>> etc.),
>>>>>>>>> so overhead is not issue. However, it is still twice as slow as 
>>>>>>>>> reading
>>>>>>>>> uncached table.
>>>>>>>>>
>>>>>>>>> I have spark.rdd.compress = true, 
>>>>>>>>> spark.sql.inMemoryColumnarStorage.compressed
>>>>>>>>> = true, spark.serializer =
>>>>>>>>> org.apache.spark.serializer.KryoSerializer
>>>>>>>>>
>>>>>>>>> Something that may be of relevance ...
>>>>>>>>>
>>>>>>>>> The underlying table is Parquet, 10 partitions totaling ~350 MB.
>>>>>>>>> For mapPartition phase of query on uncached table shows input size of 
>>>>>>>>> 351
>>>>>>>>> MB. However, after the table is cached, the storage shows the cache 
>>>>>>>>> size as
>>>>>>>>> 12GB. So the in-memory representation seems much bigger than on-disk, 
>>>>>>>>> even
>>>>>>>>> with the compression options turned on. Any thoughts on this ?
>>>>>>>>>
>>>>>>>>> mapPartition phase same query for cache table shows input size of
>>>>>>>>> 12GB (full size of cache table) and takes twice the time as 
>>>>>>>>> mapPartition
>>>>>>>>> for uncached query.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 6, 2015 at 6:47 PM, Michael Armbrust <
>>>>>>>>> mich...@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> Check the storage tab.  Does the table actually fit in memory?
>>>>>>>>>> Otherwise you are rebuilding column buffers in addition to reading 
>>>>>>>>>> the data
>>>>>>>>>> off of the disk.
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 6, 2015 at 4:39 PM, Manoj Samel <
>>>>>>>>>> manojsamelt...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Spark 1.2
>>>>>>>>>>>
>>>>>>>>>>> Data stored in parquet table (large number of rows)
>>>>>>>>>>>
>>>>>>>>>>> Test 1
>>>>>>>>>>>
>>>>>>>>>>> select a, sum(b), sum(c) from table
>>>>>>>>>>>
>>>>>>>>>>> Test
>>>>>>>>>>>
>>>>>>>>>>> sqlContext.cacheTable()
>>>>>>>>>>> select a, sum(b), sum(c) from table  - "seed cache" First time
>>>>>>>>>>> slow since loading cache ?
>>>>>>>>>>> select a, sum(b), sum(c) from table  - Second time it should be
>>>>>>>>>>> faster as it should be reading from cache, not HDFS. But it is 
>>>>>>>>>>> slower than
>>>>>>>>>>> test1
>>>>>>>>>>>
>>>>>>>>>>> Any thoughts? Should a different query be used to seed cache ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to