Re: Spark app write too many small parquet files
How many partition should it be when streaming? - As in streaming process the data will growing in size and is there any configuration for limit file size and write to new file if it is more than x (let says 128MB per file) Another question about performance when query to these parquet files. What is the practise for number of file size and files ? How to compacting small parquet flies to small number of bigger parquet file ? Thanks, Kevin. On Tue, Nov 29, 2016 at 3:01 AM, Chin Wei Low <lowchin...@gmail.com> wrote: > Try limit the partitions. spark.sql.shuffle.partitions > > This control the number of files generated. > > On 28 Nov 2016 8:29 p.m., "Kevin Tran" <kevin...@gmail.com> wrote: > >> Hi Denny, >> Thank you for your inputs. I also use 128 MB but still too many files >> generated by Spark app which is only ~14 KB each ! That's why I'm asking if >> there is a solution for this if some one has same issue. >> >> Cheers, >> Kevin. >> >> On Mon, Nov 28, 2016 at 7:08 PM, Denny Lee <denny.g@gmail.com> wrote: >> >>> Generally, yes - you should try to have larger data sizes due to the >>> overhead of opening up files. Typical guidance is between 64MB-1GB; >>> personally I usually stick with 128MB-512MB with the default of snappy >>> codec compression with parquet. A good reference is Vida Ha's presentation >>> Data >>> Storage Tips for Optimal Spark Performance >>> <https://spark-summit.org/2015/events/data-storage-tips-for-optimal-spark-performance/>. >>> >>> >>> On Sun, Nov 27, 2016 at 9:44 PM Kevin Tran <kevin...@gmail.com> wrote: >>> >>>> Hi Everyone, >>>> Does anyone know what is the best practise of writing parquet file from >>>> Spark ? >>>> >>>> As Spark app write data to parquet and it shows that under that >>>> directory there are heaps of very small parquet file (such as >>>> e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet). Each parquet file is >>>> only 15KB >>>> >>>> Should it write each chunk of bigger data size (such as 128 MB) with >>>> proper number of files ? >>>> >>>> Does anyone find out any performance changes when changing data size of >>>> each parquet file ? >>>> >>>> Thanks, >>>> Kevin. >>>> >>> >>
Re: Spark app write too many small parquet files
Hi Denny, Thank you for your inputs. I also use 128 MB but still too many files generated by Spark app which is only ~14 KB each ! That's why I'm asking if there is a solution for this if some one has same issue. Cheers, Kevin. On Mon, Nov 28, 2016 at 7:08 PM, Denny Lee <denny.g@gmail.com> wrote: > Generally, yes - you should try to have larger data sizes due to the > overhead of opening up files. Typical guidance is between 64MB-1GB; > personally I usually stick with 128MB-512MB with the default of snappy > codec compression with parquet. A good reference is Vida Ha's presentation > Data > Storage Tips for Optimal Spark Performance > <https://spark-summit.org/2015/events/data-storage-tips-for-optimal-spark-performance/>. > > > On Sun, Nov 27, 2016 at 9:44 PM Kevin Tran <kevin...@gmail.com> wrote: > >> Hi Everyone, >> Does anyone know what is the best practise of writing parquet file from >> Spark ? >> >> As Spark app write data to parquet and it shows that under that directory >> there are heaps of very small parquet file (such as >> e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet). >> Each parquet file is only 15KB >> >> Should it write each chunk of bigger data size (such as 128 MB) with >> proper number of files ? >> >> Does anyone find out any performance changes when changing data size of >> each parquet file ? >> >> Thanks, >> Kevin. >> >
Spark app write too many small parquet files
Hi Everyone, Does anyone know what is the best practise of writing parquet file from Spark ? As Spark app write data to parquet and it shows that under that directory there are heaps of very small parquet file (such as e73f47ef-4421-4bcc-a4db-a56b110c3089.parquet). Each parquet file is only 15KB Should it write each chunk of bigger data size (such as 128 MB) with proper number of files ? Does anyone find out any performance changes when changing data size of each parquet file ? Thanks, Kevin.
Extract timestamp from Kafka message
Hi Everyone, Does anyone know how could we extract timestamp from Kafka message in Spark streaming ? JavaPairInputDStreammessagesDStream = KafkaUtils.createDirectStream( ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics ); Thanks, Kevin.
Add sqldriver.jar to Spark 1.6.0 executors
Hi Everyone, I tried in cluster mode on YARN * spark-submit --jars /path/sqldriver.jar * --driver-class-path * spark-env.sh SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/path/*" * spark-defaults.conf spark.driver.extraClassPath spark.executor.extraClassPath None of them works for me ! Does anyone have Spark app work with driver jar on executors before please give me your ideas. Thank you. Cheers, Kevin.
Re: call() function being called 3 times
It turns out that call() function runs in different stages ... 2016-09-07 20:37:21,086 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 11.0 (TID 11) 2016-09-07 20:37:21,087 [Executor task launch worker-0] DEBUG org.apache.spark.executor.Executor - Task 11's epoch is 0 ... 2016-09-07 20:37:21,096 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 11.0 (TID 11). 2412 bytes result sent to driver ... <=== call() called here !! 2016-09-07 20:37:22,341 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 12.0 (TID 12) 2016-09-07 20:37:22,343 [Executor task launch worker-0] DEBUG org.apache.spark.executor.Executor - Task 12's epoch is 0 <=== call() called here !! 2016-09-07 20:37:22,362 [Executor task launch worker-0] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 12.0 (TID 12). 2518 bytes result sent to driver Does anyone have any ideas? On Wed, Sep 7, 2016 at 7:30 PM, Kevin Tran <kevin...@gmail.com> wrote: > Hi Everyone, > Does anyone know why call() function being called *3 times* for each > message arrive > > JavaDStream message = messagesDStream.map(new >>> Function<Tuple2<String, String>, String>() { >> >> @Override >> >> public String call(Tuple2<String, String> tuple2) { >> >> return tuple2._2(); >> >> } >> >> }); >> >> >>> >> >> message.foreachRDD(rdd -> { >> >> logger.debug("---> New RDD with " + rdd.partitions().size() + " >>> partitions and " + rdd.count() + " records"); *<== 1* >> >> SQLContext sqlContext = new SQLContext(rdd.context()); >> >> >>> JavaRDD rowRDD = rdd.map(new Function<String, JavaBean>() { >> >> public JavaBean call(String record) { >>> *<== being called 3 times* >> >> > > What I tried: > * *cache()* > * cleaning up *checkpoint dir* > > Thanks, > Kevin. > > >
call() function being called 3 times
Hi Everyone, Does anyone know why call() function being called *3 times* for each message arrive JavaDStream message = messagesDStream.map(new >> Function, String>() { > > @Override > > public String call(Tuple2 tuple2) { > > return tuple2._2(); > > } > > }); > > >> > > message.foreachRDD(rdd -> { > > logger.debug("---> New RDD with " + rdd.partitions().size() + " partitions >> and " + rdd.count() + " records"); *<== 1* > > SQLContext sqlContext = new SQLContext(rdd.context()); > > >> JavaRDD rowRDD = rdd.map(new Function () { > > public JavaBean call(String record) { >> *<== being called 3 times* > > What I tried: * *cache()* * cleaning up *checkpoint dir* Thanks, Kevin.
Re: Best ID Generator for ID field in parquet ?
Hi Mich, Thank you for your input. Does monotonically incremental ensure about race condition and does it duplicates the ids at some points with multi threads, multi instances, ... ? Even System.currentTimeMillis() still has duplication? Cheers, Kevin. On Mon, Sep 5, 2016 at 12:30 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > You can create a monotonically incrementing ID column on your table > > scala> val ll_18740868 = spark.table("accounts.ll_18740868") > scala> val startval = 1 > scala> val df = ll_18740868.withColumn("id", > *monotonically_increasing_id()+* startval).show (2) > +---+---+-+-+--- > ---+---++---+---+ > |transactiondate|transactiontype| sortcode|accountnumber| > transactiondescription|debitamount|creditamount|balance| id| > +---+---+-+-+--- > ---+---++---+---+ > | 2011-12-30|DEB|'30-64-72| 18740868| WWW.GFT.COM CD > 4628 | 50.0|null| 304.89| 1| > | 2011-12-30|DEB|'30-64-72| 18740868| > TDA.CONFECC.D.FRE...| 19.01|null| 354.89| 2| > +---+---+-+-+--- > ---+---++---+---+ > > > Now you have a new ID column > > HTH > > > > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 4 September 2016 at 12:43, Kevin Tran <kevin...@gmail.com> wrote: > >> Hi everyone, >> Please give me your opinions on what is the best ID Generator for ID >> field in parquet ? >> >> UUID.randomUUID(); >> AtomicReference currentTime = new AtomicReference<>(System.curre >> ntTimeMillis()); >> AtomicLong counter = new AtomicLong(0); >> >> >> Thanks, >> Kevin. >> >> >> >> https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when >> writing Parquet files) >> https://github.com/apache/spark/pull/6864/files >> > >
Best ID Generator for ID field in parquet ?
Hi everyone, Please give me your opinions on what is the best ID Generator for ID field in parquet ? UUID.randomUUID(); AtomicReference currentTime = new AtomicReference<>(System.currentTimeMillis()); AtomicLong counter = new AtomicLong(0); Thanks, Kevin. https://issues.apache.org/jira/browse/SPARK-8406 (Race condition when writing Parquet files) https://github.com/apache/spark/pull/6864/files
Re: Best practises to storing data in Parquet files
Hi Mich, My stack is as following: Data sources: * IBM MQ * Oracle database Kafka to store all messages from data sources Spark Streaming fetching messages from Kafka and do a bit transform and write parquet files to HDFS Hive / SparkSQL / Impala will query on parquet files. Do you have any reference architecture which HBase is apart of ? Please share with me best practises you might know or your favourite designs. Thanks, Kevin. On Mon, Aug 29, 2016 at 5:18 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Hi, > > Can you explain about you particular stack. > > Example what is the source of streaming data and the role that Spark plays. > > Are you dealing with Real Time and Batch and why Parquet and not something > like Hbase to ingest data real time. > > HTH > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 28 August 2016 at 15:43, Kevin Tran <kevin...@gmail.com> wrote: > >> Hi, >> Does anyone know what is the best practises to store data to parquet file? >> Does parquet file has limit in size ( 1TB ) ? >> Should we use SaveMode.APPEND for long running streaming app ? >> How should we store in HDFS (directory structure, ... )? >> >> Thanks, >> Kevin. >> > >
Best practises to storing data in Parquet files
Hi, Does anyone know what is the best practises to store data to parquet file? Does parquet file has limit in size ( 1TB ) ? Should we use SaveMode.APPEND for long running streaming app ? How should we store in HDFS (directory structure, ... )? Thanks, Kevin.
Spark StringType could hold how many characters ?
Hi, I wrote to parquet file as following: ++ |word| ++ |THIS IS MY CHARACTERS ...| |// ANOTHER LINE OF CHAC...| ++ These lines are not full text and it is being trimmed down. Does anyone know how many chacters StringType could handle ? In the Spark code: org.apache.spark.sql.types.StringType /** * The default size of a value of the StringType is 4096 bytes. */ override def defaultSize: Int = 4096 Thanks, Kevin.
Write parquet file from Spark Streaming
Hi Everyone, Does anyone know how to write parquet file after parsing data in Spark Streaming? Thanks, Kevin.