[Spark 2.0.0] java.util.concurrent.TimeoutException while writing to mongodb from Spark
Hi All, I'm writing data frame to mongodb using Stratio/Spark-MongoDB Initially it was working fine but when the data volume is high then it started giving me subjected error and details are as follows. Could anybody help me out or suggest what might the solution I should apply or how can I increase the timeout value? My cluster setup: The driver and executor are running in same VM - local[5] modespark.driver.memory 50g Mongodb: 3.2.10Imported Package: --packages com.stratio.datasource:spark-mongodb_2.11:0.12.0 Details Log: 17/02/08 07:03:51 INFO scheduler.DAGScheduler: Job 93 failed: foreachPartition at MongodbDataFrame.scala:37, took 39.026989 s 17/02/08 07:03:51 INFO executor.Executor: Finished task 182.0 in stage 253.0 (TID 25297). 60483 bytes result sent to driver 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 185.0 in stage 253.0 (TID 25300) 17/02/08 07:03:51 INFO scheduler.TaskSetManager: Finished task 182.0 in stage 253.0 (TID 25297) in 3797 ms on localhost (183/200) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 185.0 in stage 253.0 (TID 25300, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 3 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 186.0 in stage 253.0 (TID 25301) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 186.0 in stage 253.0 (TID 25301, localhost): TaskKilled (killed intentional ly) [INFO] [02/08/2017 07:03:51.283] [mongodbClientFactory-akka.actor.default-dispatcher-4] [akka://mongodbClientFactory/deadLetters] Mess age [com.stratio.datasource.mongodb.client.MongodbClientActor$ClientResponse] from Actor[akka://mongodbClientFactory/user/mongoConnect ionActor#1265577515] to Actor[akka://mongodbClientFactory/deadLetters] was not delivered. [1] dead letters encountered. This logging c an be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Getting 7 non-empty blocks out of 200 blocks 17/02/08 07:03:51 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 187.0 in stage 253.0 (TID 25302) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 187.0 in stage 253.0 (TID 25302, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO executor.Executor: Executor killed task 183.0 in stage 253.0 (TID 25298) 17/02/08 07:03:51 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 253.0 (TID 25298, localhost): TaskKilled (killed intentional ly) 17/02/08 07:03:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 253.0, whose tasks have all completed, from pool Traceback (most recent call last): File "/home/hadoop/development/myprogram/datareload_myprogram.py", line 1188, in datareporcessing(expected_datetime,expected_directory_hdfs,sqlContext) File "/home/hadoop/development/myprogram/datareload_nokialte.py", line 935, in datareporcessing df_nokia_myprogram_kpi_ready_raw.write.format("com.stratio.datasource.mongodb").mode('append').options(host='10.15.187.74:27017', cred entials='parsdev,parsdb,', database='DB', collection='MY_G_N_LN_HR', connectionsTime='30', updateFields=' S_DATETIME,CM_SBTS,CM_LNBTS,CM_LNCEL').save() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 530, in save File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o839.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 184 in stage 253.0 failed 1 times, most recent failure: Lost task 184.0 in stage 253.0 (TID 25299, localhost): java.util.concurrent.TimeoutException: Futures timed out after [3 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.conc
Re: MongoDB and Spark
use map-reduce. On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek <abhishek.mis...@xerox.com> wrote: > Hello , > > > > Is there any way to query multiple collections from mongodb using spark > and java. And i want to create only one Configuration Object. Please help > if anyone has something regarding this. > > > > > > Thank You > > Abhishek >
MongoDB and Spark
Hello , Is there any way to query multiple collections from mongodb using spark and java. And i want to create only one Configuration Object. Please help if anyone has something regarding this. Thank You Abhishek
RE: MongoDB and Spark
Anything using Spark RDD’s ??? Abhishek From: Sandeep Giri [mailto:sand...@knowbigdata.com] Sent: Friday, September 11, 2015 3:19 PM To: Mishra, Abhishek; user@spark.apache.org; d...@spark.apache.org Subject: Re: MongoDB and Spark use map-reduce. On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek <abhishek.mis...@xerox.com<mailto:abhishek.mis...@xerox.com>> wrote: Hello , Is there any way to query multiple collections from mongodb using spark and java. And i want to create only one Configuration Object. Please help if anyone has something regarding this. Thank You Abhishek
Re: MongoDB and Spark
I think it should be possible by loading collections as RDD and then doing a union on them. Regards, Sandeep Giri, +1 347 781 4573 (US) +91-953-899-8962 (IN) www.KnowBigData.com. <http://KnowBigData.com.> Phone: +1-253-397-1945 (Office) [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: other site icon] <http://knowbigdata.com> [image: facebook icon] <https://facebook.com/knowbigdata> [image: twitter icon] <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> On Fri, Sep 11, 2015 at 3:40 PM, Mishra, Abhishek <abhishek.mis...@xerox.com > wrote: > Anything using Spark RDD’s ??? > > > > Abhishek > > > > *From:* Sandeep Giri [mailto:sand...@knowbigdata.com] > *Sent:* Friday, September 11, 2015 3:19 PM > *To:* Mishra, Abhishek; user@spark.apache.org; d...@spark.apache.org > *Subject:* Re: MongoDB and Spark > > > > use map-reduce. > > > > On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek <abhishek.mis...@xerox.com> > wrote: > > Hello , > > > > Is there any way to query multiple collections from mongodb using spark > and java. And i want to create only one Configuration Object. Please help > if anyone has something regarding this. > > > > > > Thank You > > Abhishek > >
Re: MongoDB and Spark
Unfortunately, MongoDB does not directly expose its locality via its client API so the problem with trying to schedule Spark tasks against it is that the tasks themselves cannot be scheduled locally on nodes containing query results- which means you can only assume most results will be sent over the network to the task that needs to process it. This is bad. The other reason (which is also related to the issue of locality) is that I'm not sure if there's an easy way to spread the results of a query over multiple different clients- thus you'd probably have to start your Spark RDD with a single partition and then repartition. What you've done at that point is you've taken data from multiple mongodb nodes and you've collected them on a single node just to re-partition them, again across the network, onto multiple nodes. This is also bad. I think this is the reason it was recommended to use MongoDB's mapreduce because they can use their locality information internally. I had this same issue w/ Couchbase a couple years back- it's unfortunate but it's the reality. On Fri, Sep 11, 2015 at 9:34 AM, Sandeep Giri <sand...@knowbigdata.com> wrote: > I think it should be possible by loading collections as RDD and then doing > a union on them. > > Regards, > Sandeep Giri, > +1 347 781 4573 (US) > +91-953-899-8962 (IN) > > www.KnowBigData.com. <http://KnowBigData.com.> > Phone: +1-253-397-1945 (Office) > > [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: > other site icon] <http://knowbigdata.com> [image: facebook icon] > <https://facebook.com/knowbigdata> [image: twitter icon] > <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> > > > On Fri, Sep 11, 2015 at 3:40 PM, Mishra, Abhishek < > abhishek.mis...@xerox.com> wrote: > >> Anything using Spark RDD’s ??? >> >> >> >> Abhishek >> >> >> >> *From:* Sandeep Giri [mailto:sand...@knowbigdata.com] >> *Sent:* Friday, September 11, 2015 3:19 PM >> *To:* Mishra, Abhishek; user@spark.apache.org; d...@spark.apache.org >> *Subject:* Re: MongoDB and Spark >> >> >> >> use map-reduce. >> >> >> >> On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek <abhishek.mis...@xerox.com> >> wrote: >> >> Hello , >> >> >> >> Is there any way to query multiple collections from mongodb using spark >> and java. And i want to create only one Configuration Object. Please help >> if anyone has something regarding this. >> >> >> >> >> >> Thank You >> >> Abhishek >> >> >