Re: groupBy and store in parquet

2016-05-12 Thread Michal Vince

Hi Xinh

sorry for my late reply

it`s slow because of two reasons (at least to my knowledge)

1. lots of IOs - writing as json, then reading and writing again as parquet

2. because of nested rdd I can`t run the cycle and filter by event_type 
in parallel - this applies to your solution (3rd step)


I ended up with the suggestion you proposed - in realtime partition by 
event_type and store as jsons (which is pretty fast) and with another 
job which runs less frequently read jsons and store them as parquet



thank you very much

best regards

Michal



On 05/05/2016 06:02 PM, Xinh Huynh wrote:

Hi Michal,

Why is your solution so slow? Is it from the file IO caused by storing 
in a temp file as JSON and then reading it back in and writing it as 
Parquet? How are you getting "events" in the first place?


Do you have the original Kafka messages as an RDD[String]? Then how about:

1. Start with eventsAsRDD : RDD[String] (before converting to DF)
2. eventsAsRDD.map() --> use a RegEx to parse out the event_type of 
each event

 For example, search the string for {"event_type"="[.*]"}
3. Now, filter by each event_type to create a separate RDD for each 
type, and convert those to DF. You only convert to DF for events of 
the same type, so you avoid the NULLs.


Xinh


On Thu, May 5, 2016 at 2:52 AM, Michal Vince > wrote:


Hi Xinh

For (1) the biggest problem are those null columns. e.g. DF will
have ~1000 columns so every partition of that DF will have ~1000
columns, one of the partitioned columns can have 996 null columns
which is big waste of space (in my case more than 80% in avg)

for (2) I can`t really change anything as the source belongs to
the 3rd party


Miso


On 05/04/2016 05:21 PM, Xinh Huynh wrote:

Hi**Michal,

For (1), would it be possible to partitionBy two columns to
reduce the size? Something like partitionBy("event_type", "date").

For (2), is there a way to separate the different event types
upstream, like on different Kafka topics, and then process them
separately?

Xinh

On Wed, May 4, 2016 at 7:47 AM, Michal Vince
> wrote:

Hi guys

I`m trying to store kafka stream with ~5k events/s as
efficiently as possible in parquet format to hdfs.

I can`t make any changes to kafka (belongs to 3rd party)


Events in kafka are in json format, but the problem is there
are many different event types (from different subsystems
with different number of fields, different size etc..) so it
doesn`t make any sense to store them in the same file


I was trying to read data to DF and then repartition it by
event_type and store


events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)

which is quite fast but have 2 drawbacks that I`m aware of

1. output folder has only one partition which can be huge

2. all DFs created like this share the same schema, so even
dfs with few fields have tons of null fields


My second try is bit naive and really really slow (you can
see why in code) - filter DF by event type and store them
temporarily as json (to get rid of null fields)

val event_types =events.select($"event_type").distinct().collect() // 
get event_types in this batch

for (row <- event_types) {
   val currDF =events.filter($"event_type" === row.get(0))
   val tmpPath =tmpFolder + row.get(0)
   
currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)

}
hdfs.delete(new Path(tmpFolder),true)


Do you have any suggestions for any better solution to this?

thanks










Re: groupBy and store in parquet

2016-05-05 Thread Xinh Huynh
Hi Michal,

Why is your solution so slow? Is it from the file IO caused by storing in a
temp file as JSON and then reading it back in and writing it as Parquet?
How are you getting "events" in the first place?

Do you have the original Kafka messages as an RDD[String]? Then how about:

1. Start with eventsAsRDD : RDD[String] (before converting to DF)
2. eventsAsRDD.map() --> use a RegEx to parse out the event_type of each
event
 For example, search the string for {"event_type"="[.*]"}
3. Now, filter by each event_type to create a separate RDD for each type,
and convert those to DF. You only convert to DF for events of the same
type, so you avoid the NULLs.

Xinh


On Thu, May 5, 2016 at 2:52 AM, Michal Vince  wrote:

> Hi Xinh
>
> For (1) the biggest problem are those null columns. e.g. DF will have
> ~1000 columns so every partition of that DF will have ~1000 columns, one of
> the partitioned columns can have 996 null columns which is big waste of
> space (in my case more than 80% in avg)
>
> for (2) I can`t really change anything as the source belongs to the 3rd
> party
>
>
> Miso
>
> On 05/04/2016 05:21 PM, Xinh Huynh wrote:
>
> Hi Michal,
>
> For (1), would it be possible to partitionBy two columns to reduce the
> size? Something like partitionBy("event_type", "date").
>
> For (2), is there a way to separate the different event types upstream,
> like on different Kafka topics, and then process them separately?
>
> Xinh
>
> On Wed, May 4, 2016 at 7:47 AM, Michal Vince 
> wrote:
>
>> Hi guys
>>
>> I`m trying to store kafka stream with ~5k events/s as efficiently as
>> possible in parquet format to hdfs.
>>
>> I can`t make any changes to kafka (belongs to 3rd party)
>>
>>
>> Events in kafka are in json format, but the problem is there are many
>> different event types (from different subsystems with different number of
>> fields, different size etc..) so it doesn`t make any sense to store them in
>> the same file
>>
>>
>> I was trying to read data to DF and then repartition it by event_type and
>> store
>>
>> events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
>>
>> which is quite fast but have 2 drawbacks that I`m aware of
>>
>> 1. output folder has only one partition which can be huge
>>
>> 2. all DFs created like this share the same schema, so even dfs with few
>> fields have tons of null fields
>>
>>
>> My second try is bit naive and really really slow (you can see why in
>> code) - filter DF by event type and store them temporarily as json (to get
>> rid of null fields)
>>
>> val event_types = events.select($"event_type").distinct().collect() // get 
>> event_types in this batch
>> for (row <- event_types) {
>>   val currDF = events.filter($"event_type" === row.get(0))
>>   val tmpPath = tmpFolder + row.get(0)
>>   
>> currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
>>   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
>>
>> }hdfs.delete(new Path(tmpFolder), true)
>>
>>
>> Do you have any suggestions for any better solution to this?
>>
>> thanks
>>
>>
>>
>
>


Re: groupBy and store in parquet

2016-05-05 Thread Michal Vince

Hi Xinh

For (1) the biggest problem are those null columns. e.g. DF will have 
~1000 columns so every partition of that DF will have ~1000 columns, one 
of the partitioned columns can have 996 null columns which is big waste 
of space (in my case more than 80% in avg)


for (2) I can`t really change anything as the source belongs to the 3rd 
party



Miso


On 05/04/2016 05:21 PM, Xinh Huynh wrote:

Hi**Michal,

For (1), would it be possible to partitionBy two columns to reduce the 
size? Something like partitionBy("event_type", "date").


For (2), is there a way to separate the different event types 
upstream, like on different Kafka topics, and then process them 
separately?


Xinh

On Wed, May 4, 2016 at 7:47 AM, Michal Vince > wrote:


Hi guys

I`m trying to store kafka stream with ~5k events/s as efficiently
as possible in parquet format to hdfs.

I can`t make any changes to kafka (belongs to 3rd party)


Events in kafka are in json format, but the problem is there are
many different event types (from different subsystems with
different number of fields, different size etc..) so it doesn`t
make any sense to store them in the same file


I was trying to read data to DF and then repartition it by
event_type and store


events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)

which is quite fast but have 2 drawbacks that I`m aware of

1. output folder has only one partition which can be huge

2. all DFs created like this share the same schema, so even dfs
with few fields have tons of null fields


My second try is bit naive and really really slow (you can see why
in code) - filter DF by event type and store them temporarily as
json (to get rid of null fields)

val event_types =events.select($"event_type").distinct().collect() // get 
event_types in this batch

for (row <- event_types) {
   val currDF =events.filter($"event_type" === row.get(0))
   val tmpPath =tmpFolder + row.get(0)
   
currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)

}
hdfs.delete(new Path(tmpFolder),true)


Do you have any suggestions for any better solution to this?

thanks







Re: groupBy and store in parquet

2016-05-04 Thread Xinh Huynh
Hi Michal,

For (1), would it be possible to partitionBy two columns to reduce the
size? Something like partitionBy("event_type", "date").

For (2), is there a way to separate the different event types upstream,
like on different Kafka topics, and then process them separately?

Xinh

On Wed, May 4, 2016 at 7:47 AM, Michal Vince  wrote:

> Hi guys
>
> I`m trying to store kafka stream with ~5k events/s as efficiently as
> possible in parquet format to hdfs.
>
> I can`t make any changes to kafka (belongs to 3rd party)
>
>
> Events in kafka are in json format, but the problem is there are many
> different event types (from different subsystems with different number of
> fields, different size etc..) so it doesn`t make any sense to store them in
> the same file
>
>
> I was trying to read data to DF and then repartition it by event_type and
> store
>
> events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)
>
> which is quite fast but have 2 drawbacks that I`m aware of
>
> 1. output folder has only one partition which can be huge
>
> 2. all DFs created like this share the same schema, so even dfs with few
> fields have tons of null fields
>
>
> My second try is bit naive and really really slow (you can see why in
> code) - filter DF by event type and store them temporarily as json (to get
> rid of null fields)
>
> val event_types = events.select($"event_type").distinct().collect() // get 
> event_types in this batch
> for (row <- event_types) {
>   val currDF = events.filter($"event_type" === row.get(0))
>   val tmpPath = tmpFolder + row.get(0)
>   
> currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
>   sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)
>
> }hdfs.delete(new Path(tmpFolder), true)
>
>
> Do you have any suggestions for any better solution to this?
>
> thanks
>
>
>


groupBy and store in parquet

2016-05-04 Thread Michal Vince

Hi guys

I`m trying to store kafka stream with ~5k events/s as efficiently as 
possible in parquet format to hdfs.


I can`t make any changes to kafka (belongs to 3rd party)


Events in kafka are in json format, but the problem is there are many 
different event types (from different subsystems with different number 
of fields, different size etc..) so it doesn`t make any sense to store 
them in the same file



I was trying to read data to DF and then repartition it by event_type 
and store


events.write.partitionBy("event_type").format("parquet").mode(org.apache.spark.sql.SaveMode.Append).save(tmpFolder)

which is quite fast but have 2 drawbacks that I`m aware of

1. output folder has only one partition which can be huge

2. all DFs created like this share the same schema, so even dfs with few 
fields have tons of null fields



My second try is bit naive and really really slow (you can see why in 
code) - filter DF by event type and store them temporarily as json (to 
get rid of null fields)


val event_types =events.select($"event_type").distinct().collect() // get 
event_types in this batch

for (row <- event_types) {
  val currDF =events.filter($"event_type" === row.get(0))
  val tmpPath =tmpFolder + row.get(0)
  
currDF.write.format("json").mode(org.apache.spark.sql.SaveMode.Append).save(tmpPath)
  sqlContext.read.json(tmpPath).write.format("parquet").save(basePath)

}
hdfs.delete(new Path(tmpFolder),true)


Do you have any suggestions for any better solution to this?

thanks