Re: Issue with parquet write after join (Spark 1.4.0)

2015-07-01 Thread Pooja Jain
Join is happening successfully as I am able to do count() after the join.

Error is coming only while trying to write in parquet format on hdfs.

Thanks,
Pooja.

On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 It says:

 Caused by: java.net.ConnectException: Connection refused: slave2/...:54845

 Could you look in the executor logs (stderr on slave2) and see what made
 it shut down? Since you are doing a join there's a high possibility of OOM
 etc.


 Thanks
 Best Regards

 On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com wrote:

 Hi,

 We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
 spark-submit. We are facing parquet write issue after doing dataframe joins

 We have a full data set and then an incremental data. We are reading them
 as dataframes, joining them, and then writing the data to the hdfs system
 in parquet format. We are getting the timeout error on the last partition.

 But if we do a count on the joined data it is working - which gives us
 the confidence that join is happening properly. Only in case of writing to
 the hdfs it is timing out.

 Code flow:

 // join two data frames - dfBase and dfIncr on primaryKey
 val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === 
 dfIncr(primaryKey), outer)

 // applying a reduce function on each row.
 val mergedDF = joinedDF.map(x =
   reduceFunc(x)
 )

 //converting back to dataframe
 val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

 //writing to parquet file
 newdf.write.parquet(hdfsfilepath)

 Getting following exception:

 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no 
 recent heartbeats: 255766 ms exceeds timeout 24 ms
 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: Executor heartbeat timed out after 255766 ms
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 
 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 
 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to 
 kill executor(s) 26
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block 
 manager BlockManagerId(26, slave2, 54845)
 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number 
 of 26 executor(s).
 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now 
 unavailable on executor 26 (193/200, false)
 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested 
 to kill executor(s) 26.
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on 
 slave2: remote Rpc client disassociated
 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 
 from TaskSet 7.0
 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
 executor 26 from BlockManagerMaster.
 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully 
 in removeExecutor
 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with 
 remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is 
 now gated for [5000] ms. Reason is: [Disassociated].
 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated 
 or disconnected! Shutting down. slave2:51849
 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 
 (TID 310, slave2): org.apache.spark.SparkException: Task failed while 
 writing rows.
  at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
  at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
  at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
  at org.apache.spark.scheduler.Task.run(Task.scala:70)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
 Caused

Issue with parquet write after join (Spark 1.4.0)

2015-06-30 Thread Pooja Jain
Hi,

We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
spark-submit. We are facing parquet write issue after doing dataframe joins

We have a full data set and then an incremental data. We are reading them
as dataframes, joining them, and then writing the data to the hdfs system
in parquet format. We are getting the timeout error on the last partition.

But if we do a count on the joined data it is working - which gives us the
confidence that join is happening properly. Only in case of writing to the
hdfs it is timing out.

Code flow:

// join two data frames - dfBase and dfIncr on primaryKey
val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) ===
dfIncr(primaryKey), outer)

// applying a reduce function on each row.
val mergedDF = joinedDF.map(x =
  reduceFunc(x)
)

//converting back to dataframe
val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema)

//writing to parquet file
newdf.write.parquet(hdfsfilepath)

Getting following exception:

15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26
with no recent heartbeats: 255766 ms exceeds timeout 24 ms
15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: Executor heartbeat timed out after 255766 ms
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in
stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost)
15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in
stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes)
15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3)
15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting
to kill executor(s) 26
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing
block manager BlockManagerId(26, slave2, 54845)
15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total
number of 26 executor(s).
15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is
now unavailable on executor 26 (193/200, false)
15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver
requested to kill executor(s) 26.
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26
on slave2: remote Rpc client disassociated
15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for
26 from TaskSet 7.0
15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5)
15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to
remove executor 26 from BlockManagerMaster.
15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26
successfully in removeExecutor
15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association
with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed,
address is now gated for [5000] ms. Reason is: [Disassociated].
15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver
terminated or disconnected! Shutting down. slave2:51849
15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in
stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task
failed while writing rows.
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at 
org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to
connect to slave2/...:54845
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:84)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)