Re: Spark app write too many small parquet files

2016-12-08 Thread Kevin Tran
How many partition should it be when streaming? - As in streaming process
the data will growing in size and is there any configuration for limit file
size and write to new file if it is more than x (let says  128MB per file)

Another question about performance when query to these parquet files. What
is the practise for number of file size and files ?

How to compacting small parquet flies to small number of bigger parquet
file ?

Thanks,
Kevin.

On Tue, Nov 29, 2016 at 3:01 AM, Chin Wei Low <lowchin...@gmail.com> wrote:

> Try limit the partitions. spark.sql.shuffle.partitions
>
> This control the number of files generated.
>
> On 28 Nov 2016 8:29 p.m., "Kevin Tran" <kevin...@gmail.com> wrote:
>
>> Hi Denny,
>> Thank you for your inputs. I also use 128 MB but still too many files
>> generated by Spark app which is only ~14 KB each ! That's why I'm asking if
>> there is a solution for this if some one has same issue.
>>
>> Cheers,
>> Kevin.
>>
>> On Mon, Nov 28, 2016 at 7:08 PM, Denny Lee <denny.g@gmail.com> wrote:
>>
>>> Generally, yes - you should try to have larger data sizes due to the
>>> overhead of opening up files.  Typical guidance is between 64MB-1GB;
>>> personally I usually stick with 128MB-512MB with the default of snappy
>>> codec compression with parquet.  A good reference is Vida Ha's presentation 
>>> Data
>>> Storage Tips for Optimal Spark Performance
>>> <https://spark-summit.org/2015/events/data-storage-tips-for-optimal-spark-performance/>.
>>>
>>>
>>> On Sun, Nov 27, 2016 at 9:44 PM Kevin Tran <kevin...@gmail.com> wrote:
>>>
>>>> Hi Everyone,
>>>> Does anyone know what is the best practise of writing parquet file from
>>>> Spark ?
>>>>
>>>> As Spark app write data to parquet and it shows that under that
>>>> directory there are heaps of very small parquet file (such as
>>>> e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet). Each parquet file is
>>>> only 15KB
>>>>
>>>> Should it write each chunk of  bigger data size (such as 128 MB) with
>>>> proper number of files ?
>>>>
>>>> Does anyone find out any performance changes when changing data size of
>>>> each parquet file ?
>>>>
>>>> Thanks,
>>>> Kevin.
>>>>
>>>
>>


Re: Spark app write too many small parquet files

2016-11-28 Thread Kevin Tran
Hi Denny,
Thank you for your inputs. I also use 128 MB but still too many files
generated by Spark app which is only ~14 KB each ! That's why I'm asking if
there is a solution for this if some one has same issue.

Cheers,
Kevin.

On Mon, Nov 28, 2016 at 7:08 PM, Denny Lee <denny.g@gmail.com> wrote:

> Generally, yes - you should try to have larger data sizes due to the
> overhead of opening up files.  Typical guidance is between 64MB-1GB;
> personally I usually stick with 128MB-512MB with the default of snappy
> codec compression with parquet.  A good reference is Vida Ha's presentation 
> Data
> Storage Tips for Optimal Spark Performance
> <https://spark-summit.org/2015/events/data-storage-tips-for-optimal-spark-performance/>.
>
>
> On Sun, Nov 27, 2016 at 9:44 PM Kevin Tran <kevin...@gmail.com> wrote:
>
>> Hi Everyone,
>> Does anyone know what is the best practise of writing parquet file from
>> Spark ?
>>
>> As Spark app write data to parquet and it shows that under that directory
>> there are heaps of very small parquet file (such as 
>> e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet).
>> Each parquet file is only 15KB
>>
>> Should it write each chunk of  bigger data size (such as 128 MB) with
>> proper number of files ?
>>
>> Does anyone find out any performance changes when changing data size of
>> each parquet file ?
>>
>> Thanks,
>> Kevin.
>>
>


Spark app write too many small parquet files

2016-11-27 Thread Kevin Tran
Hi Everyone,
Does anyone know what is the best practise of writing parquet file from
Spark ?

As Spark app write data to parquet and it shows that under that directory
there are heaps of very small parquet file (such as
e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet). Each parquet file is only
15KB

Should it write each chunk of  bigger data size (such as 128 MB) with
proper number of files ?

Does anyone find out any performance changes when changing data size of
each parquet file ?

Thanks,
Kevin.


Extract timestamp from Kafka message

2016-09-25 Thread Kevin Tran
Hi Everyone,
Does anyone know how could we extract timestamp from Kafka message in Spark
streaming ?

JavaPairInputDStream messagesDStream =
KafkaUtils.createDirectStream(
   ssc,
   String.class,
   String.class,
   StringDecoder.class,
   StringDecoder.class,
   kafkaParams,
   topics
   );


Thanks,
Kevin.


Add sqldriver.jar to Spark 1.6.0 executors

2016-09-14 Thread Kevin Tran
Hi Everyone,

I tried in cluster mode on YARN
 * spark-submit  --jars /path/sqldriver.jar
 * --driver-class-path
 * spark-env.sh
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/path/*"
 * spark-defaults.conf
spark.driver.extraClassPath
spark.executor.extraClassPath

None of them works for me !

Does anyone have Spark app work with driver jar on executors before please
give me your ideas. Thank you.

Cheers,
Kevin.


Re: call() function being called 3 times

2016-09-07 Thread Kevin Tran
It turns out that call() function runs in different stages

...
2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID
11)
2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 11's epoch is 0
...
2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID
11). 2412 bytes result sent to driver
...
<=== call() called here !!

2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID
12)
2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG
org.apache.spark.executor.Executor - Task 12's epoch is 0

<=== call() called here !!

2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO
 org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID
12). 2518 bytes result sent to driver


Does anyone have any ideas?




On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran <kevin...@gmail.com> wrote:

> Hi Everyone,
> Does anyone know why call() function being called *3 times* for each
> message arrive
>
> JavaDStream message = messagesDStream.map(new
>>> Function<Tuple2<String, String>, String>() {
>>
>> @Override
>>
>> public String call(Tuple2<String, String> tuple2) {
>>
>> return tuple2._2();
>>
>> }
>>
>> });
>>
>>
>>>
>>
>> message.foreachRDD(rdd -> {
>>
>> logger.debug("---> New RDD with " + rdd.partitions().size() + "
>>> partitions and " + rdd.count() + " records");   *<== 1*
>>
>> SQLContext sqlContext = new SQLContext(rdd.context());
>>
>>
>>> JavaRDD rowRDD = rdd.map(new Function<String, JavaBean>() {
>>
>> public JavaBean call(String record) {
>>>   *<== being called 3 times*
>>
>>
>
> What I tried:
>  * *cache()*
>  * cleaning up *checkpoint dir*
>
> Thanks,
> Kevin.
>
>
>


call() function being called 3 times

2016-09-07 Thread Kevin Tran
Hi Everyone,
Does anyone know why call() function being called *3 times* for each
message arrive

JavaDStream message = messagesDStream.map(new
>> Function, String>() {
>
> @Override
>
> public String call(Tuple2 tuple2) {
>
> return tuple2._2();
>
> }
>
> });
>
>
>>
>
> message.foreachRDD(rdd -> {
>
> logger.debug("---> New RDD with " + rdd.partitions().size() + " partitions
>> and " + rdd.count() + " records");   *<== 1*
>
> SQLContext sqlContext = new SQLContext(rdd.context());
>
>
>> JavaRDD rowRDD = rdd.map(new Function() {
>
> public JavaBean call(String record) {
>>   *<== being called 3 times*
>
>

What I tried:
 * *cache()*
 * cleaning up *checkpoint dir*

Thanks,
Kevin.


Re: Best ID Generator for ID field in parquet ?

2016-09-04 Thread Kevin Tran
Hi Mich,
Thank you for your input.
Does monotonically incremental ensure about race condition and does it
duplicates the ids at some points with multi threads, multi instances, ... ?

Even System.currentTimeMillis() still has duplication?

Cheers,
Kevin.

On Mon, Sep 5, 2016 at 12:30 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> You can create a monotonically incrementing ID column on your table
>
> scala> val ll_18740868 = spark.table("accounts.ll_18740868")
> scala> val startval = 1
> scala> val df = ll_18740868.withColumn("id",
> *monotonically_increasing_id()+* startval).show (2)
> +---+---+-+-+---
> ---+---++---+---+
> |transactiondate|transactiontype| sortcode|accountnumber|
> transactiondescription|debitamount|creditamount|balance| id|
> +---+---+-+-+---
> ---+---++---+---+
> | 2011-12-30|DEB|'30-64-72| 18740868|  WWW.GFT.COM CD
> 4628 |   50.0|null| 304.89|  1|
> | 2011-12-30|DEB|'30-64-72| 18740868|
> TDA.CONFECC.D.FRE...|  19.01|null| 354.89|  2|
> +---+---+-+-+---
> ---+---++---+---+
>
>
> Now you have a new ID column
>
> HTH
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 4 September 2016 at 12:43, Kevin Tran <kevin...@gmail.com> wrote:
>
>> Hi everyone,
>> Please give me your opinions on what is the best ID Generator for ID
>> field in parquet ?
>>
>> UUID.randomUUID();
>> AtomicReference currentTime = new AtomicReference<>(System.curre
>> ntTimeMillis());
>> AtomicLong counter = new AtomicLong(0);
>> 
>>
>> Thanks,
>> Kevin.
>>
>>
>> 
>> https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when
>> writing Parquet files)
>> https://github.com/apache/spark/pull/6864/files
>>
>
>


Best ID Generator for ID field in parquet ?

2016-09-04 Thread Kevin Tran
Hi everyone,
Please give me your opinions on what is the best ID Generator for ID field
in parquet ?

UUID.randomUUID();
AtomicReference currentTime = new
AtomicReference<>(System.currentTimeMillis());
AtomicLong counter = new AtomicLong(0);


Thanks,
Kevin.



https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when
writing Parquet files)
https://github.com/apache/spark/pull/6864/files


Re: Best practises to storing data in Parquet files

2016-08-28 Thread Kevin Tran
Hi Mich,
My stack is as following:

Data sources:
 * IBM MQ
 * Oracle database

Kafka to store all messages from data sources
Spark Streaming fetching messages from Kafka and do a bit transform and
write parquet files to HDFS
Hive / SparkSQL / Impala will query on parquet files.

Do you have any reference architecture which HBase is apart of ?

Please share with me best practises you might know or your favourite
designs.

Thanks,
Kevin.









On Mon, Aug 29, 2016 at 5:18 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> Can you explain about you particular stack.
>
> Example what is the source of streaming data and the role that Spark plays.
>
> Are you dealing with Real Time and Batch and why Parquet and not something
> like Hbase to ingest data real time.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 August 2016 at 15:43, Kevin Tran <kevin...@gmail.com> wrote:
>
>> Hi,
>> Does anyone know what is the best practises to store data to parquet file?
>> Does parquet file has limit in size ( 1TB ) ?
>> Should we use SaveMode.APPEND for long running streaming app ?
>> How should we store in HDFS (directory structure, ... )?
>>
>> Thanks,
>> Kevin.
>>
>
>


Best practises to storing data in Parquet files

2016-08-28 Thread Kevin Tran
Hi,
Does anyone know what is the best practises to store data to parquet file?
Does parquet file has limit in size ( 1TB ) ?
Should we use SaveMode.APPEND for long running streaming app ?
How should we store in HDFS (directory structure, ... )?

Thanks,
Kevin.


Spark StringType could hold how many characters ?

2016-08-28 Thread Kevin Tran
Hi,
I wrote to parquet file as following:

++
|word|
++
|THIS IS MY CHARACTERS ...|
|// ANOTHER LINE OF CHAC...|
++

These lines are not full text and it is being trimmed down.
Does anyone know how many chacters StringType could handle ?

In the Spark code:
org.apache.spark.sql.types.StringType
/**
   * The default size of a value of the StringType is 4096 bytes.
   */
  override def defaultSize: Int = 4096


Thanks,
Kevin.


Write parquet file from Spark Streaming

2016-08-27 Thread Kevin Tran
Hi Everyone,

Does anyone know how to write parquet file after parsing data in Spark
Streaming?



Thanks,

Kevin.