Hi All, I'm writing data frame to mongodb using Stratio/Spark-MongoDB Initially it was working fine but when the data volume is high then it started giving me subjected error and details are as follows.
Could anybody help me out or suggest what might the solution I should apply or how can I increase the timeout value? My cluster setup: The driver and executor are running in same VM - local[5] modespark.driver.memory 50g Mongodb: 3.2.10Imported Package: --packages com.stratio.datasource:spark-mongodb_2.11:0.12.0 Details Log: 17/02/08 07:03:51 INFO scheduler.DAGScheduler: Job 93 failed: foreachPartition at MongodbDataFrame.scala:37, took 39.026989 s 17/02/08 07:03:51 INFO executor.Executor: Finished task 182.0 in stage 253.0 (TID 25297). 60483 bytes result sent to driver 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 185.0 in stage 253.0 (TID 25300) 17/02/08 07:03:51 INFO scheduler.TaskSetManager: Finished task 182.0 in stage 253.0 (TID 25297) in 3797 ms on localhost (183/200) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 185.0 in stage 253.0 (TID 25300, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 186.0 in stage 253.0 (TID 25301) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 186.0 in stage 253.0 (TID 25301, localhost): TaskKilled (killed intentional ly) [INFO] [02/08/2017 07:03:51.283] [mongodbClientFactory-akka.actor.default-dispatcher-4] [akka://mongodbClientFactory/deadLetters] Mess age [com.stratio.datasource.mongodb.client.MongodbClientActor$ClientResponse] from Actor[akka://mongodbClientFactory/user/mongoConnect ionActor#1265577515] to Actor[akka://mongodbClientFactory/deadLetters] was not delivered. [1] dead letters encountered. This logging c an be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 7 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 187.0 in stage 253.0 (TID 25302) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 187.0 in stage 253.0 (TID 25302, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 183.0 in stage 253.0 (TID 25298) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 253.0 (TID 25298, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 253.0, whose tasks have all completed, from pool Traceback (most recent call last): File "/home/hadoop/development/myprogram/datareload_myprogram.py", line 1188, in <module> datareporcessing(expected_datetime,expected_directory_hdfs,sqlContext) File "/home/hadoop/development/myprogram/datareload_nokialte.py", line 935, in datareporcessing df_nokia_myprogram_kpi_ready_raw.write.format("com.stratio.datasource.mongodb").mode('append').options(host='10.15.187.74:27017', cred entials='parsdev,parsdb,XXXX', database='DB', collection='MY_G_N_LN_HR', connectionsTime='300000', updateFields=' S_DATETIME,CM_SBTS,CM_LNBTS,CM_LNCEL').save() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 530, in save File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o839.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 in stage 253.0 failed 1 times, most recent failure: Lost task 184.0 in stage 253.0 (TID 25299, localhost): java.util.concurrent.TimeoutException: Futures timed out after [3 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at com.stratio.datasource.mongodb.client.MongodbClientFactory$.getClient(MongodbClientFactory.scala:103) at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:63) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:42) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:37) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.s cala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:881) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:881) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2117) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2117) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2117) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2116) at com.stratio.datasource.mongodb.MongodbDataFrame.saveToMongodb(MongodbDataFrame.scala:37) at com.stratio.datasource.mongodb.MongodbRelation.insert(MongodbRelation.scala:106) at com.stratio.datasource.mongodb.DefaultSource.createRelation(DefaultSource.scala:59) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [3 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at com.stratio.datasource.mongodb.client.MongodbClientFactory$.getClient(MongodbClientFactory.scala:103) at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:63) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:42) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:37) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 17/02/08 07:03:51 INFO spark.SparkContext: Invoking stop() from shutdown hook Thanks & Best Regards, Palash Gupta