Re: Saving Parquet files to S3
Hi Ankur, I also tried setting a property to write parquet file size of 256MB. I am using pyspark below is how I set the property but it's not working for me. How did you set the property? spark_context._jsc.hadoopConfiguration().setInt( "dfs.blocksize", 268435456) spark_context._jsc.hadoopConfiguration().setInt( "parquet.block.size", 268435) Thanks, Bijay On Fri, Jun 10, 2016 at 5:24 AM, Ankur Jain wrote: > Thanks maropu.. It worked… > > > > *From:* Takeshi Yamamuro [mailto:linguin@gmail.com] > *Sent:* 10 June 2016 11:47 AM > *To:* Ankur Jain > *Cc:* user@spark.apache.org > *Subject:* Re: Saving Parquet files to S3 > > > > Hi, > > > > You'd better off `setting parquet.block.size`. > > > > // maropu > > > > On Thu, Jun 9, 2016 at 7:48 AM, Daniel Siegmann < > daniel.siegm...@teamaol.com> wrote: > > I don't believe there's anyway to output files of a specific size. What > you can do is partition your data into a number of partitions such that the > amount of data they each contain is around 1 GB. > > > > On Thu, Jun 9, 2016 at 7:51 AM, Ankur Jain wrote: > > Hello Team, > > > > I want to write parquet files to AWS S3, but I want to size each file size > to 1 GB. > > Can someone please guide me on how I can achieve the same? > > > > I am using AWS EMR with spark 1.6.1. > > > > Thanks, > > Ankur > > Information transmitted by this e-mail is proprietary to YASH Technologies > and/ or its Customers and is intended for use only by the individual or > entity to which it is addressed, and may contain information that is > privileged, confidential or exempt from disclosure under applicable law. If > you are not the intended recipient or it appears that this mail has been > forwarded to you without proper authority, you are notified that any use or > dissemination of this information in any manner is strictly prohibited. In > such cases, please notify us immediately at i...@yash.com and delete this > mail from your records. > > > > > > > > -- > > --- > Takeshi Yamamuro > Information transmitted by this e-mail is proprietary to YASH Technologies > and/ or its Customers and is intended for use only by the individual or > entity to which it is addressed, and may contain information that is > privileged, confidential or exempt from disclosure under applicable law. If > you are not the intended recipient or it appears that this mail has been > forwarded to you without proper authority, you are notified that any use or > dissemination of this information in any manner is strictly prohibited. In > such cases, please notify us immediately at i...@yash.com and delete this > mail from your records. >
Re: Error joining dataframes
Hi, Try this one: df_join = df1.*join*(df2, 'Id', "fullouter") Thanks, Bijay On Tue, May 17, 2016 at 9:39 AM, ram kumar wrote: > Hi, > > I tried to join two dataframe > > df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter") > > df_join.registerTempTable("join_test") > > > When querying "Id" from "join_test" > > 0: jdbc:hive2://> *select Id from join_test;* > *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is > *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0) > 0: jdbc:hive2://> > > Is there a way to merge the value of df1("Id") and df2("Id") into one "Id" > > Thanks >
Disable parquet metadata summary in
Hi, How can we disable writing _common_metdata while saving Data Frame in parquet format in PySpark. I tried to set the property using below command but didn't helped. sparkContext._jsc.hadoopConfiguration().set("parquet.enable.summary-metadata", "false") Thanks, Bijay
Re: SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error
Thanks for the suggestions and links. The problem arises when I used DataFrame api to write but it works fine when doing insert overwrite in hive table. # Works good hive_context.sql("insert overwrite table {0} partiton (e_dt, c_dt) select * from temp_table".format(table_name)) # Doesn't work, throws java.lang.OutOfMemoryError: Requested array size exceeds VM limit df.write.mode('overwrite').partitionBy('e_dt','c_dt').parquet("/path/to/file/") Thanks, Bijay On Wed, May 4, 2016 at 3:02 PM, Prajwal Tuladhar wrote: > If you are running on 64-bit JVM with less than 32G heap, you might want > to enable -XX:+UseCompressedOops[1]. And if your dataframe is somehow > generating more than 2^31-1 number of arrays, you might have to rethink > your options. > > [1] https://spark.apache.org/docs/latest/tuning.html > > On Wed, May 4, 2016 at 9:44 PM, Bijay Kumar Pathak > wrote: > >> Hi, >> >> I am reading the parquet file around 50+ G which has 4013 partitions with >> 240 columns. Below is my configuration >> >> driver : 20G memory with 4 cores >> executors: 45 executors with 15G memory and 4 cores. >> >> I tried to read the data using both Dataframe read and using hive context >> to read the data using hive SQL but for the both cases, it throws me below >> error with no further description on error. >> >> hive_context.sql("select * from test.base_table where >> date='{0}'".format(part_dt)) >> sqlcontext.read.parquet("/path/to/partion/") >> >> # >> # java.lang.OutOfMemoryError: Requested array size exceeds VM limit >> # -XX:OnOutOfMemoryError="kill -9 %p" >> # Executing /bin/sh -c "kill -9 16953"... >> >> >> What could be wrong over here since I think increasing memory only will >> not help in this case since it reached the array size limit. >> >> Thanks, >> Bijay >> > > > > -- > -- > Cheers, > Praj >
SqlContext parquet read OutOfMemoryError: Requested array size exceeds VM limit error
Hi, I am reading the parquet file around 50+ G which has 4013 partitions with 240 columns. Below is my configuration driver : 20G memory with 4 cores executors: 45 executors with 15G memory and 4 cores. I tried to read the data using both Dataframe read and using hive context to read the data using hive SQL but for the both cases, it throws me below error with no further description on error. hive_context.sql("select * from test.base_table where date='{0}'".format(part_dt)) sqlcontext.read.parquet("/path/to/partion/") # # java.lang.OutOfMemoryError: Requested array size exceeds VM limit # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 16953"... What could be wrong over here since I think increasing memory only will not help in this case since it reached the array size limit. Thanks, Bijay
Re: Performance with Insert overwrite into Hive Table.
Thanks Ted. This looks like the issue since I am running it in EMR and the Hive version is 1.0.0. Thanks, Bijay On Wed, May 4, 2016 at 10:29 AM, Ted Yu wrote: > Looks like you were hitting HIVE-11940 > > On Wed, May 4, 2016 at 10:02 AM, Bijay Kumar Pathak > wrote: > >> Hello, >> >> I am writing Dataframe of around 60+ GB into partitioned Hive Table using >> hiveContext in parquet format. The Spark insert overwrite jobs completes in >> a reasonable amount of time around 20 minutes. >> >> But the job is taking a huge amount of time more than 2 hours to copy >> data from .hivestaging directory in HDFS to final partition directory. What >> could be the potential problem over here? >> >> hive_c.sql(""" >> INSERT OVERWRITE TABLE {0} PARTITION (row_eff_end_dt='{1}', >> ccd_dt) >> SELECT * from temp_table >> """.format(table_name, eff_end_dt) >> ) >> >> And the below process from the log is taking more than 2 hours. >> >> 16/05/04 06:41:28 INFO Hive: Replacing >> src:hdfs://internal:8020/user/hadoop/so_core_us/.hive-staging_hive_2016-05-04_04-39-13_992_6600245407573569189-1/-ext-1/ccd_dt=2012-09-02/part-00306, >> dest: >> hdfs://internal:8020/user/hadoop/so_core_us/row_eff_end_dt=-12-31/ccd_dt=2012-09-02/part-00306, >> Status:true >> 16/05/04 06:41:28 INFO Hive: New loading path = >> hdfs://internal:8020/user/hadoop/so_core_us/.hive-staging_hive_2016-05-04_04-39-13_992_6600245407573569189-1/-ext-1/ccd_dt=2012-09-02 >> with partSpec {row_eff_end_dt=-12-31, ccd_dt=2012-09-02} >> >> >> Thanks, >> Bijay >> > >
Performance with Insert overwrite into Hive Table.
Hello, I am writing Dataframe of around 60+ GB into partitioned Hive Table using hiveContext in parquet format. The Spark insert overwrite jobs completes in a reasonable amount of time around 20 minutes. But the job is taking a huge amount of time more than 2 hours to copy data from .hivestaging directory in HDFS to final partition directory. What could be the potential problem over here? hive_c.sql(""" INSERT OVERWRITE TABLE {0} PARTITION (row_eff_end_dt='{1}', ccd_dt) SELECT * from temp_table """.format(table_name, eff_end_dt) ) And the below process from the log is taking more than 2 hours. 16/05/04 06:41:28 INFO Hive: Replacing src:hdfs://internal:8020/user/hadoop/so_core_us/.hive-staging_hive_2016-05-04_04-39-13_992_6600245407573569189-1/-ext-1/ccd_dt=2012-09-02/part-00306, dest: hdfs://internal:8020/user/hadoop/so_core_us/row_eff_end_dt=-12-31/ccd_dt=2012-09-02/part-00306, Status:true 16/05/04 06:41:28 INFO Hive: New loading path = hdfs://internal:8020/user/hadoop/so_core_us/.hive-staging_hive_2016-05-04_04-39-13_992_6600245407573569189-1/-ext-1/ccd_dt=2012-09-02 with partSpec {row_eff_end_dt=-12-31, ccd_dt=2012-09-02} Thanks, Bijay
Re: Dataframe saves for a large set but throws OOM for a small dataset
Hi, I was facing the same issue on Spark 1.6. My data size was around 100 GB and was writing in the partition Hive table. I was able to solve this issue by starting from 6G of memory and reaching upto 15GB of memory per executor with overhead of 2GB and partitioning the DataFrame before doing the insert overwrite into the Hive Table. From my experience Parquet puts lot of memory pressure in executor, try increasing your executor memory. Here are relevant JIRA ticket: https://issues.apache.org/jira/browse/SPARK-8890 https://issues.apache.org/jira/browse/PARQUET-222 Thanks, Bijay On Sat, Apr 30, 2016 at 1:52 PM, Brandon White wrote: > randomSplit instead of randomSample > On Apr 30, 2016 1:51 PM, "Brandon White" wrote: > >> val df = globalDf >> val filteredDfs= filterExpressions.map { expr => >> val filteredDf = df.filter(expr) >> val samples = filteredDf.randomSample([.7, .3]) >>(samples(0), samples(1) >> } >> >> val largeDfs = filteredDfs.(_._1) >> val smallDfs = filteredDfs(_._2) >> >> val unionedLargeDfs = tailRecursiveUnionAll(largeDfs.tail, largeDfs.head) >> val unionedSmallDfs = tailRecursiveUnionAll(smallDfs.tail, smallDfs.head) >> >> unionedLargeDfs.write.parquet(output) // works fine >> unionedSmallDfs.write.parquet(output) // breaks with OOM stack trace in >> first thread >> >> There is no skew here. I am using Spark 1.5.1 with 80 executors with 7g >> memory. >> On Apr 30, 2016 1:22 PM, "Ted Yu" wrote: >> >>> Can you provide a bit more information: >>> >>> Does the smaller dataset have skew ? >>> >>> Which release of Spark are you using ? >>> >>> How much memory did you specify ? >>> >>> Thanks >>> >>> On Sat, Apr 30, 2016 at 1:17 PM, Brandon White >>> wrote: >>> Hello, I am writing to datasets. One dataset is x2 larger than the other. Both datasets are written to parquet the exact same way using df.write.mode("Overwrite").parquet(outputFolder) The smaller dataset OOMs while the larger dataset writes perfectly fine. Here is the stack trace: Any ideas what is going on here and how I can fix it? Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2367) at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415) at java.lang.StringBuilder.append(StringBuilder.java:132) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:90) at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:947) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:304) >>> >>>
Re: Spark SQL insert overwrite table not showing all the partition.
Hi Zhan, I tried with IF NOT EXISTS clause and still I cannot see the first partition only the partition with last insert overwrite is present in the table. Thanks, Bijay On Thu, Apr 21, 2016 at 11:18 PM, Zhan Zhang wrote: > INSERT OVERWRITE will overwrite any existing data in the table or partition > >- unless IF NOT EXISTS is provided for a partition (as of Hive 0.9.0 ><https://issues.apache.org/jira/browse/HIVE-2612>). > > > > Thanks. > > Zhan Zhang > > On Apr 21, 2016, at 3:20 PM, Bijay Kumar Pathak wrote: > > Hi, > > I have a job which writes to the Hive table with dynamic partition. Inside > the job, I am writing into the table two-time but I am only seeing the > partition with last write although I can see in the Spark UI it is > processing data fro both the partition. > > Below is the query I am using to write to the table. > > hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2) > SELECT * from temp_table > """.format(date_val) > ) > > > > Thanks, > Bijay > > >
Spark SQL insert overwrite table not showing all the partition.
Hi, I have a job which writes to the Hive table with dynamic partition. Inside the job, I am writing into the table two-time but I am only seeing the partition with last write although I can see in the Spark UI it is processing data fro both the partition. Below is the query I am using to write to the table. hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2) SELECT * from temp_table """.format(date_val) ) Thanks, Bijay
Reading conf file in Pyspark in cluster mode
Hello, I have spark jobs packaged in zipped and deployed using cluster mode in AWS EMR. The job has to read conf file packaged with the zip under the resources directory. I can read the conf file in the client mode but not in cluster mode. How do I read the conf file packaged in the zip while deploying in the cluster mode? Thanks, Bijay
Re: Connection closed Exception.
Hi Rodrick, I had tried increasing memory from 6G to 9G to 12G but still I am getting the same error. The size of dataframe I am trying to write is around 6-7 G and the Hive table is Parquet format. Thanks, Bijay On Mon, Apr 11, 2016 at 4:03 AM, Rodrick Brown wrote: > Try increasing the memory allocated for this job. > > Sent from Outlook for iPhone <https://aka.ms/wp8k5y> > > > > > On Sun, Apr 10, 2016 at 9:12 PM -0700, "Bijay Kumar Pathak" < > bkpat...@mtu.edu> wrote: > > Hi, >> >> I am running Spark 1.6 on EMR. I have workflow which does the following >> things: >> >>1. Read the 2 flat file, create the data frame and join it. >>2. Read the particular partition from the hive table and joins the >>dataframe from 1 with it. >>3. Finally, insert overwrite into hive table which is partitioned >>into two fields. >> >> The stout log message in terminal when I submit the jobs show the below >> message. >> # >> # java.lang.OutOfMemoryError: Java heap space >> # -XX:OnOutOfMemoryError="kill -9 %p" >> # Executing /bin/sh -c "kill -9 30149"... >> Killed >> >> And while I check the YARN logs it shows the below error. The Spark UI >> doesn't show any failure stages or tasks but the jobs get stuck in the >> middle without completing all the stages. Did anyone come across similar >> issues? What could be the reason behind it and how could I troubleshoot it? >> >> >> 16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1 >> requests outstanding when connection from ip-10-184-195-29.ec2.internal/ >> 10.184.195.29:43162 is closed >> 16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown >> (ip-10-184-195-29.ec2.internal:43162) driver disconnected. >> 16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver >> 10.184.195.29:43162 disassociated! Shutting down. >> 16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message >> [message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12, >> ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts >> java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/ >> 10.184.195.29:43162 closed >> at >> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) >> at >> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) >> at >> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) >> at >> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) >> at >> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) >> at >> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >> at java.lang.Thread.run(Thread.java:745) >> 16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called >> 16/04/11 00:19:38 INFO ut
Connection closed Exception.
Hi, I am running Spark 1.6 on EMR. I have workflow which does the following things: 1. Read the 2 flat file, create the data frame and join it. 2. Read the particular partition from the hive table and joins the dataframe from 1 with it. 3. Finally, insert overwrite into hive table which is partitioned into two fields. The stout log message in terminal when I submit the jobs show the below message. # # java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 30149"... Killed And while I check the YARN logs it shows the below error. The Spark UI doesn't show any failure stages or tasks but the jobs get stuck in the middle without completing all the stages. Did anyone come across similar issues? What could be the reason behind it and how could I troubleshoot it? 16/04/11 00:19:38 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from ip-10-184-195-29.ec2.internal/ 10.184.195.29:43162 is closed 16/04/11 00:19:38 WARN executor.CoarseGrainedExecutorBackend: An unknown (ip-10-184-195-29.ec2.internal:43162) driver disconnected. 16/04/11 00:19:38 ERROR executor.CoarseGrainedExecutorBackend: Driver 10.184.195.29:43162 disassociated! Shutting down. 16/04/11 00:19:38 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(12,[Lscala.Tuple2;@6545df9a,BlockManagerId(12, ip-10-184-194-43.ec2.internal, 43867))] in 1 attempts java.io.IOException: Connection from ip-10-184-195-29.ec2.internal/ 10.184.195.29:43162 closed at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) 16/04/11 00:19:38 INFO storage.DiskBlockManager: Shutdown hook called 16/04/11 00:19:38 INFO util.ShutdownHookManager: Shutdown hook called