Unsubscribe
Unsubscribe Thanks & Best Regards, Engr. Palash Gupta Consultant, OSS/CEM/Big Data Skype: palash2494 https://www.linkedin.com/in/enggpalashgupta
Unsubscribe
Thanks & Best Regards, Engr. Palash Gupta Consultant, OSS/CEM/Big Data Skype: palash2494 https://www.linkedin.com/in/enggpalashgupta
[Spark 2.0.0] java.util.concurrent.TimeoutException while writing to mongodb from Spark
egatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [3 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at com.stratio.datasource.mongodb.client.MongodbClientFactory$.getClient(MongodbClientFactory.scala:103) at com.stratio.datasource.mongodb.writer.MongodbWriter.saveWithPk(MongodbWriter.scala:63) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:42) at com.stratio.datasource.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(MongodbDataFrame.scala:37) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:883) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) ... 1 more 17/02/08 07:03:51 INFO spark.SparkContext: Invoking stop() from shutdown hook Thanks & Best Regards, Palash Gupta
Re: spark 2.02 error when writing to s3
Hi, You need to add mode overwrite option to avoid this error. //P.Gupta Sent from Yahoo Mail on Android On Fri, 20 Jan, 2017 at 2:15 am, VND Tremblay, Paul wrote: I have come across a problem when writing CSV files to S3 in Spark 2.02. The problem does not exist in Spark 1.6. 19:09:20 Caused by: java.io.IOException: File already exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv My code is this: new_rdd\ 135 .map(add_date_diff)\ 136 .map(sid_offer_days)\ 137 .groupByKey()\ 138 .map(custom_sort)\ 139 .map(before_rev_date)\ 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, num_weeks))\ 141 .toDF()\ 142 .write.csv( 143 sep = "|", 144 header = True, 145 nullValue = '', 146 quote = None, 147 path = path 148 ) In order to get the path (the last argument), I call this function: 150 def _get_s3_write(test): 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), _get_s3_write_dir(test)): 152 s3_utility.remove_s3_dir(_get_write_bucket_name(), _get_s3_write_dir(test)) 153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test)) In other words, I am removing the directory if it exists before I write. Notes: * If I use a small set of data, then I don't get the error * If I use Spark 1.6, I don't get the error * If I read in a simple dataframe and then write to S3, I still get the error (without doing any transformations) * If I do the previous step with a smaller set of data, I don't get the error. * I am using pyspark, with python 2.7 * The thread at this link: https://forums.aws.amazon.com/thread.jspa?threadID=152470 Indicates the problem is caused by a problem sync problem. With large datasets, spark tries to write multiple times and causes the error. The suggestion is to turn off speculation, but I believe speculation is turned off by default in pyspark. Thanks! Paul _ Paul Tremblay Analytics Specialist THE BOSTON CONSULTING GROUP STL ▪ Tel. + ▪ Mobile + tremblay.p...@bcg.com _ Read BCG's latest insights, analysis, and viewpoints atbcgperspectives.com The Boston Consulting Group, Inc. This e-mail message may contain confidential and/or privileged information.If you are not an addressee or otherwise authorized to receive this message,you should not use, copy, disclose or take any action based on this e-mail orany information contained in the message. If you have received this materialin error, please advise the sender immediately by reply e-mail and delete thismessage. Thank you.
Re: Spark #cores
Hi, I think I faced the same problem for Spark 2.1.0 when I tried to define number of executors from SparkConf ot SparkSession builder in a standalone cluster. Always it is taking all available core. There are three ways to do it: 1. Define spark.executor.cores in conf/spark-defaults.conf and then when you run spark-submit it will read from there. You need to set it for all hosts in cluster. 2. Passing parameter in spark-submit that you have done already 3. Set this parameter in runtime (in your code while initiating sparkSession or sparkConf object) e.g. in python # Configure Sparkconf = SparkConf().setAppName(APP_NAME).set("spark.default.parallelism","20").set("spark.cores.max","5").set("spark.executor.cores","2").set("spark.driver.memory","10g").set("spark.executor.memory","5g").set("spark.storage.memoryFraction","0.4").set("spark.local.dir","/tmp/my-app/")sc = SparkContext(conf=conf)sqlContext = SQLContext(sc) Even you can set sparkSession as well like this to set this kind of parameter. You can try option #1 & 2 and verify from UI that how many cores are used as per your setting. Moreover as you are running standalone cluster mode, hope you set --master parameter to spark:// url. Best Regards,P.Gupta Sent from Yahoo Mail on Android On Wed, 18 Jan, 2017 at 11:33 pm, Saliya Ekanayake wrote: The Spark version I am using is 2.10. The language is Scala. This is running in standalone cluster mode. Each worker is able to use all physical CPU cores in the cluster as is the default case. I was using the following parameters to spark-submit --conf spark.executor.cores=1 --conf spark.default.parallelism=32 Later, I read that the term "cores" doesn't mean physical CPU cores but rather #tasks that an executor can execute. Anyway, I don't have a clear idea how to set the number of executors per physical node. I see there's an option in the Yarn mode, but it's not available for standalone cluster mode. Thank you,Saliya On Wed, Jan 18, 2017 at 12:13 PM, Palash Gupta wrote: Hi, Can you please share how you are assigning cpu core & tell us spark version and language you are using? //Palash Sent from Yahoo Mail on Android On Wed, 18 Jan, 2017 at 10:16 pm, Saliya Ekanayake wrote: Thank you, for the quick response. No, this is not Spark SQL. I am running the built-in PageRank. On Wed, Jan 18, 2017 at 10:33 AM, wrote: Are you talking here of Spark SQL ? If yes,spark.sql.shuffle.partitions needs to be changed. From: Saliya Ekanayake [mailto:esal...@gmail.com] Sent: Wednesday, January 18, 2017 8:56 PM To: User Subject: Spark #cores Hi, I am running a Spark application setting the number of executor cores 1 and a default parallelism of 32 over 8 physical nodes. The web UI shows it's running on 200 cores. I can't relate this number to the parameters I've used. How can I control the parallelism in a more deterministic way? Thank you, Saliya -- Saliya Ekanayake, Ph.D Applied Computer Scientist Network Dynamics and Simulation Science Laboratory (NDSSL) Virginia Tech, Blacksburg This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ __ __ www.accenture.com -- Saliya Ekanayake, Ph.DApplied Computer ScientistNetwork Dynamics and Simulation Science Laboratory (NDSSL)Virginia Tech, Blacksburg -- Saliya Ekanayake, Ph.DApplied Computer ScientistNetwork Dynamics and Simulation Science Laboratory (NDSSL)Virginia Tech, Blacksburg
Re: Spark #cores
Hi, Can you please share how you are assigning cpu core & tell us spark version and language you are using? //Palash Sent from Yahoo Mail on Android On Wed, 18 Jan, 2017 at 10:16 pm, Saliya Ekanayake wrote: Thank you, for the quick response. No, this is not Spark SQL. I am running the built-in PageRank. On Wed, Jan 18, 2017 at 10:33 AM, wrote: Are you talking here of Spark SQL ? If yes,spark.sql.shuffle.partitions needs to be changed. From: Saliya Ekanayake [mailto:esal...@gmail.com] Sent: Wednesday, January 18, 2017 8:56 PM To: User Subject: Spark #cores Hi, I am running a Spark application setting the number of executor cores 1 and a default parallelism of 32 over 8 physical nodes. The web UI shows it's running on 200 cores. I can't relate this number to the parameters I've used. How can I control the parallelism in a more deterministic way? Thank you, Saliya -- Saliya Ekanayake, Ph.D Applied Computer Scientist Network Dynamics and Simulation Science Laboratory (NDSSL) Virginia Tech, Blacksburg This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ __ __ www.accenture.com -- Saliya Ekanayake, Ph.DApplied Computer ScientistNetwork Dynamics and Simulation Science Laboratory (NDSSL)Virginia Tech, Blacksburg
Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception
Hi, Example: dframe = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri", " mongodb://user:pass@172.26.7.192:27017/db_name.collection_name").load()dframe.printSchema() One more thing if you create one db in mongo, please create a collection with a record. Otherwise mongo may not keep that db if online session die //Palash Sent from Yahoo Mail on Android On Tue, 17 Jan, 2017 at 12:44 pm, Palash Gupta wrote: Hi Marco, What is the user and password you are using for mongodb connection? Did you enable authorization? Better to include user & pass in mongo url. I remember I tested with python successfully. Best Regards,Palash Sent from Yahoo Mail on Android On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni wrote: hi all i have the folllowign snippet which loads a dataframe from a csv file and tries to save it to mongodb. For some reason, the MongoSpark.save method raises the following exception Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260) at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36) Which is bizzarre as i m pretty sure i am setting all the necessary properties in the SparkConf could you kindly assist? I am running Spark 2.0.1 locally with a local mongodb instance running at 127.0.0.1:27017 I am using version 2.0.0 of mongo-spark-connector I am running on Scala 2.11 kr val spark = SparkSession .builder() .master("local") .appName("Spark Mongo Example") .getOrCreate() spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/") spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/") spark.conf.set("spark.mongodb.output.database", "test") println(s"SparkPRoperties:${spark.conf.getAll}") val df = getDataFrame(spark) // Loading any dataframe from a file df.printSchema() println(s"Head:${df.head()}") println(s"Count:${df.count()}") println("## SAVING TO MONGODB #") import com.mongodb.spark.config._ import com.mongodb.spark.config._ val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext))) MongoSpark.save(df, writeConfig)
Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception
Hi Marco, What is the user and password you are using for mongodb connection? Did you enable authorization? Better to include user & pass in mongo url. I remember I tested with python successfully. Best Regards,Palash Sent from Yahoo Mail on Android On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni wrote: hi all i have the folllowign snippet which loads a dataframe from a csv file and tries to save it to mongodb. For some reason, the MongoSpark.save method raises the following exception Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260) at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36) Which is bizzarre as i m pretty sure i am setting all the necessary properties in the SparkConf could you kindly assist? I am running Spark 2.0.1 locally with a local mongodb instance running at 127.0.0.1:27017 I am using version 2.0.0 of mongo-spark-connector I am running on Scala 2.11 kr val spark = SparkSession .builder() .master("local") .appName("Spark Mongo Example") .getOrCreate() spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/") spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/") spark.conf.set("spark.mongodb.output.database", "test") println(s"SparkPRoperties:${spark.conf.getAll}") val df = getDataFrame(spark) // Loading any dataframe from a file df.printSchema() println(s"Head:${df.head()}") println(s"Count:${df.count()}") println("## SAVING TO MONGODB #") import com.mongodb.spark.config._ import com.mongodb.spark.config._ val writeConfig = WriteConfig(Map("collection" -> "spark", "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext))) MongoSpark.save(df, writeConfig)
Re: spark-shell running out of memory even with 6GB ?
Hello Mr. Burton, Can you share example code how did you implement for other user to see? "So I think what we did is did a repartition too large and now we ran out of memory in spark shell. " Thanks! P.Gupta Sent from Yahoo Mail on Android On Tue, 10 Jan, 2017 at 8:20 am, Kevin Burton wrote: Ah.. ok. I think I know what's happening now. I think we found this problem when running a job and doing a repartition() Spark is just way way way too sensitive to memory configuration. The 2GB per shuffle limit is also insanely silly in 2017. So I think what we did is did a repartition too large and now we ran out of memory in spark shell. On Mon, Jan 9, 2017 at 5:53 PM, Steven Ruppert wrote: The spark-shell process alone shouldn't take up that much memory, at least in my experience. Have you dumped the heap to see what's all in there? What environment are you running spark in? Doing stuff like RDD.collect() or .countByKey will pull potentially a lot of data the spark-shell heap. Another thing thing that can fill up the spark master process heap (which is also run in the spark-shell process) is running lots of jobs, the logged SparkEvents of which stick around in order for the UI to render. There are some options under `spark.ui.retained*` to limit that if it's a problem. On Mon, Jan 9, 2017 at 6:00 PM, Kevin Burton wrote: We've had various OOM issues with spark and have been trying to track them down one by one. Now we have one in spark-shell which is super surprising. We currently allocate 6GB to spark shell, as confirmed via 'ps' Why the heck would the *shell* need that much memory. I'm going to try to give it more of course but would be nice to know if this is a legitimate memory constraint or there is a bug somewhere. PS: One thought I had was that it would be nice to have spark keep track of where an OOM was encountered, in what component. Kevin -- We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers! Founder/CEO Spinn3r.com Location: San Francisco, CA blog: http://burtonator.wordpr ess.com… or check out my Google+ profile CONFIDENTIALITY NOTICE: This email message, and any documents, files or previous e-mail messages attached to it is for the sole use of the intended recipient(s) and may contain confidential and privileged information. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message. -- We’re hiring if you know of any awesome Java Devops or Linux Operations Engineers! Founder/CEO Spinn3r.com Location: San Francisco, CA blog: http://burtonator.wordpress.com… or check out my Google+ profile
[Spark 2.1.0] Resource Scheduling Challenge in pyspark sparkSession
Hi User Team, I'm trying to schedule resource in spark 2.1.0 using below code but still all the cpu cores are captured by only single spark application and hence no other application is starting. Could you please help me out: sqlContext = SparkSession.builder.master("spark://172.26.7.192:7077").config("spark.sql.warehouse.dir", "/tmp/PM/").config("spark.sql.shuffle.partitions", "6").config("spark.cores.max", "5").config("spark.executor.cores", "2").config("spark.driver.memory", "8g").config("spark.executor.memory", "4g").appName(APP_NAME).getOrCreate() Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494 Thanks & Best Regards,Palash Gupta
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Macro, Yes it was in the same host when problem was found. Even when I tried to start with different host, the problem is still there. Any hints or suggestion will be appreciated. Thanks & Best Regards, Palash Gupta From: Marco Mistroni To: Palash Gupta Cc: ayan guha ; User Sent: Thursday, January 5, 2017 1:01 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi If it only happens when u run 2 app at same time could it be that these 2 apps somehow run on same host?Kr On 5 Jan 2017 9:00 am, "Palash Gupta" wrote: Hi Marco and respected member, I have done all the possible things suggested by Forum but still I'm having same issue: 1. I will migrate my applications to production environment where I will have more resourcesPalash>> I migrated my application in production where I have more CPU Cores, Memory & total 7 host in spark cluster. 2. Use Spark 2.0.0 function to load CSV rather using databrics apiPalash>> Earlier I'm using databricks csv api with Spark 2.0.0. As suggested by one of the mate, Now I'm using spark 2.0.0 built in csv loader. 3. In production I will run multiple spark application at a time and try to reproduce this error for both file system and HDFS loading casPalash>> yes I reproduced and it only happen when two spark application run at a time. Please see the logs: 17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.15.187.79): java.io.IOException: org.apache.spa rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1260) at org.apache.spark.broadcast. TorrentBroadcast. readBroadcastBlock( TorrentBroadcast.scala:174) at org.apache.spark.broadcast. TorrentBroadcast._value$ lzycompute(TorrentBroadcast. scala:65) at org.apache.spark.broadcast. TorrentBroadcast._value( TorrentBroadcast.scala:65) at org.apache.spark.broadcast. TorrentBroadcast.getValue( TorrentBroadcast.scala:89) at org.apache.spark.broadcast. Broadcast.value(Broadcast. scala:70) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:67) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread. java:745) Caused by: org.apache.spark. SparkException: Failed to get broadcast_1_piece0 of broadcast_1 at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply$mcVI$s p(TorrentBroadcast.scala:146) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren tBroadcast.scala:125) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren tBroadcast.scala:125) at scala.collection.immutable. List.foreach(List.scala:381) at org.apache.spark.broadcast. TorrentBroadcast.org$apache$ spark$broadcast$ TorrentBroadcast$$readBlocks( TorrentBroadcast.scala: 125) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$ readBroadcastBlock$1.apply( TorrentBroadcast.scala:186) at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1253) ... 11 more 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 1 on executor id: 1 hostname: 10.15.187.78 . 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor 10.15.187.78: java.io.IOException (org .apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate 1] 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 2 on executor id: 1 hostname: 10.15.187.78 . 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor 10.15.187.78: java.io.IOException (org .apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate 2] 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 3 on executor id: 6 hostname: 1
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
oad at NativeMethodAccessorImpl.java:-2, took 2.262950 s Traceback (most recent call last): File "/home/hadoop/development/datareloadwithps.py", line 851, in datareporcessing(expected_datetime,expected_directory_hdfs) File "/home/hadoop/development/datareloadwithps.py", line 204, in datareporcessing df_codingsc_raw = sqlContext.read.format("csv").option("header",'true').load(HDFS_BASE_URL + hdfs_dir + filename) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 147, in load File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o58.load. Thanks & Best Regards, Palash Gupta From: Palash Gupta To: Marco Mistroni Cc: ayan guha ; User Sent: Saturday, December 31, 2016 12:43 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi Marco, Thanks! Please have my response: so you have a pyspark application running on spark 2.0Palash>> Yes You have python scripts dropping files on HDFSPalash>> Yes (it is not part of spark process, just independent python script) then you have two spark jobPalash>> Yes - 1 load expected hour data (pls explain. HOw many files on average)Palash>> 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 4 total number of file: 28 - 1 load delayed data(pls explain. how many files on average)Palash>> We may or may not get delayed data in each hour. But for example disconnection between CSV generation system and spark system has a network issue then we will get many delayed hour files. On average: 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 2 total number of file: 14 Do these scripts run continuously (they have a while loop) or you kick them off via a job scheduler on an hourly basisPalash>> No this script is running in linux cron schedule (not in while loop). Do these scripts run on a cluster? Palash>> My pyspark application is running in a standalone cluster mode where I have only two VM (One master, two workers). So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation etc then populate mongo Palash>> Yes At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these files are not deleted). So your job now loads 5 files, does aggregation and store data in mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at T+1)? Palash>> No it will only handle with newly arrived file for new expected hour. But in delayed data handling there is a possibility to reprocess an specific hour data and re-calculate KPI and update in mongodb. You said before that simply parsing csv files via spark in a standalone app works fine. Palash>> I said that when I stopped delayed data loading spark script now expected hour data loading is smooth and running good since last three days. Then what you can try is to do exactly the same processig you are doing but instead of loading csv files from HDFS you can load from local directory and see if the problem persists..(this just to exclude any issues with loading HDFS data.)Palash>> The issue is same loading from file system. When I'm running only single script it is smooth. When I'm running both script at a time in two separate pyspark applications, sometimes it is failing showing this error while loading file from file system. Now I'm doing below things as per suggestion: 1. I will migrate my applications to production environment where I will have more resources2. Use Spark 2.0.0 function to load CSV rather using databrics api3. In production I will run multiple spark application at a time and try to reproduce this error for both file system and HDFS loading case When I'm done I will share details with you. If you have any suggestion more for debug point of view, you can add here for me Thanks & Best Regards, Palash Gupta From: Marco Mistroni To: "spline_pal...@yahoo.com" Cc: ayan guha ; User Sent: Saturday, December 31, 2016 1:42 AM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi Palash so you have a pyspark application running on spark 2.0 You have python scripts dropping files on HDFS then you have two spark
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Marco, Thanks! Please have my response: so you have a pyspark application running on spark 2.0Palash>> Yes You have python scripts dropping files on HDFSPalash>> Yes (it is not part of spark process, just independent python script) then you have two spark jobPalash>> Yes - 1 load expected hour data (pls explain. HOw many files on average)Palash>> 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 4 total number of file: 28 - 1 load delayed data(pls explain. how many files on average)Palash>> We may or may not get delayed data in each hour. But for example disconnection between CSV generation system and spark system has a network issue then we will get many delayed hour files. On average: 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 2 total number of file: 14 Do these scripts run continuously (they have a while loop) or you kick them off via a job scheduler on an hourly basisPalash>> No this script is running in linux cron schedule (not in while loop). Do these scripts run on a cluster? Palash>> My pyspark application is running in a standalone cluster mode where I have only two VM (One master, two workers). So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation etc then populate mongo Palash>> Yes At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these files are not deleted). So your job now loads 5 files, does aggregation and store data in mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at T+1)? Palash>> No it will only handle with newly arrived file for new expected hour. But in delayed data handling there is a possibility to reprocess an specific hour data and re-calculate KPI and update in mongodb. You said before that simply parsing csv files via spark in a standalone app works fine. Palash>> I said that when I stopped delayed data loading spark script now expected hour data loading is smooth and running good since last three days. Then what you can try is to do exactly the same processig you are doing but instead of loading csv files from HDFS you can load from local directory and see if the problem persists..(this just to exclude any issues with loading HDFS data.)Palash>> The issue is same loading from file system. When I'm running only single script it is smooth. When I'm running both script at a time in two separate pyspark applications, sometimes it is failing showing this error while loading file from file system. Now I'm doing below things as per suggestion: 1. I will migrate my applications to production environment where I will have more resources2. Use Spark 2.0.0 function to load CSV rather using databrics api3. In production I will run multiple spark application at a time and try to reproduce this error for both file system and HDFS loading case When I'm done I will share details with you. If you have any suggestion more for debug point of view, you can add here for me Thanks & Best Regards, Palash Gupta From: Marco Mistroni To: "spline_pal...@yahoo.com" Cc: ayan guha ; User Sent: Saturday, December 31, 2016 1:42 AM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi Palash so you have a pyspark application running on spark 2.0 You have python scripts dropping files on HDFS then you have two spark job - 1 load expected hour data (pls explain. HOw many files on average) - 1 load delayed data(pls explain. how many files on average) Do these scripts run continuously (they have a while loop) or you kick them off via a job scheduler on an hourly basis Do these scripts run on a cluster? So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation etc then populate mongo At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these files are not deleted). So your job now loads 5 files, does aggregation and store data in mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at T+1)? You said before that simply parsing csv files via spark in a standalone app works fine. Then what you can try is to do exactly the same processig you are doing but instead of loading csv files from HDFS you can load from local directory and see if the problem persists..(this just to exclude any issues with loading HDFS data.) hth Marco On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta wrote: Hi Marco & Ayan, I have now clearer idea about what Marco means by Reduce. I will do it to dig down. Let me answer to your queries: hen you see the broadcast errors, does
Re: What's the best practice to load data from RDMS to Spark
Hi, If you want to load from csv, you can use below procedure. Of course you need to define spark context first. (Given example to load all csv under a folder, you can use specific name for single file) // these lines are equivalent in Spark 2.0 spark.read.format("csv").option("header", "true").load("../Downloads/*.csv") spark.read.option("header", "true").csv("../Downloads/*.csv") Best regardsPalash Gupta Sent from Yahoo Mail on Android On Fri, 30 Dec, 2016 at 11:39 pm, Raymond Xie wrote: Hello, I am new to Spark, as a SQL developer, I only took some courses online and spent some time myself, never had a chance working on a real project. I wonder what would be the best practice (tool, procedure...) to load data (csv, excel) into Spark platform? Thank you. Raymond
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Marco & Ayan, I have now clearer idea about what Marco means by Reduce. I will do it to dig down. Let me answer to your queries: hen you see the broadcast errors, does your job terminate? Palash>> Yes it terminated the app. Or are you assuming that something is wrong just because you see the message in the logs? Palash>> No it terminated for the very first step of Spark processing (in my case loading csv from hdfs) Plus...Wrt logicWho writes the CSV? With what frequency?Palash>> We parsed xml files using python (not in spark scope) & make csv and put in hdfs Does it app run all the time loading CSV from hadoop? Palash>> Every hour two separate pyspark app are running1. Loading current expected hour data, prepare kpi, do aggregation, load in mongodb2. Same operation will run for delayed hour data Are you using spark streaming?Palash>> No Does it app run fine with an older version of spark (1.6 )Palash>> I didn't test with Spark 1.6. My app is running now good as I stopped second app (delayed data loading) since last two days. Even most of the case both are running well except few times... Sent from Yahoo Mail on Android On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni wrote: Correct. I mean reduce the functionality.Uhm I realised I didn't ask u a fundamental question. When you see the broadcast errors, does your job terminate? Or are you assuming that something is wrong just because you see the message in the logs?Plus...Wrt logicWho writes the CSV? With what frequency?Does it app run all the time loading CSV from hadoop?Are you using spark streaming?Does it app run fine with an older version of spark (1.6 )Hth On 30 Dec 2016 12:44 pm, "ayan guha" wrote: @Palash: I think what Macro meant by "reduce functionality" is to reduce scope of your application's functionality so that you can isolate the issue in certain part(s) of the app...I do not think he meant "reduce" operation :) On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta wrote: Hi Marco, All of your suggestions are highly appreciated, whatever you said so far. I would apply to implement in my code and let you know. Let me answer your query: What does your program do? Palash>> In each hour I am loading many CSV files and then I'm making some KPI(s) out of them. Finally I am doing some aggregation and inserting into mongodb from spark. you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it should not take much time. Initially my processing was taking only 5 minutes as I was using all cores running only one application. When I created more separate spark applications for handling delayed data loading and implementing more use cases with parallel run, I started facing the error randomly. And due to separate resource distribution among four parallel spark application to run in parallel now some task is taking longer time than usual. But still it should not take 2-3 hours time... Currently whole applications are running in a development environment where we have only two VM cluster and I will migrate to production platform by next week. I will let you know if there is any improvement over there. I'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you provide some example of reduce functionality. Cause I'm using Spark data frame, join data frames, use SQL statement to manipulate KPI(s). Here How could I apply reduce functionality? Thanks & Best Regards, Palash Gupta From: Marco Mistroni To: "spline_pal...@yahoo.com" Cc: User Sent: Thursday, December 29, 2016 11:28 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hello no sorry i dont have any further insight into that i have seen similar errors but for completely different issues, and in most of hte cases it had to do with my data or my processing rather than Spark itself. What does your program do? you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ? i'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. I found myself doing htese kinds of things when i got errors in my spark apps. To get a concrete help you will have to trim down the code to a few lines that can reproduces the error That will be a great start Sorry for not being of much help hth marco On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta wrote: Hi Marco, Thanks for your response. Yes I tested it before & am able to load
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Marco, All of your suggestions are highly appreciated, whatever you said so far. I would apply to implement in my code and let you know. Let me answer your query: What does your program do? Palash>> In each hour I am loading many CSV files and then I'm making some KPI(s) out of them. Finally I am doing some aggregation and inserting into mongodb from spark. you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it should not take much time. Initially my processing was taking only 5 minutes as I was using all cores running only one application. When I created more separate spark applications for handling delayed data loading and implementing more use cases with parallel run, I started facing the error randomly. And due to separate resource distribution among four parallel spark application to run in parallel now some task is taking longer time than usual. But still it should not take 2-3 hours time... Currently whole applications are running in a development environment where we have only two VM cluster and I will migrate to production platform by next week. I will let you know if there is any improvement over there. I'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you provide some example of reduce functionality. Cause I'm using Spark data frame, join data frames, use SQL statement to manipulate KPI(s). Here How could I apply reduce functionality? Thanks & Best Regards, Palash Gupta From: Marco Mistroni To: "spline_pal...@yahoo.com" Cc: User Sent: Thursday, December 29, 2016 11:28 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hello no sorry i dont have any further insight into that i have seen similar errors but for completely different issues, and in most of hte cases it had to do with my data or my processing rather than Spark itself. What does your program do? you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ? i'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. I found myself doing htese kinds of things when i got errors in my spark apps. To get a concrete help you will have to trim down the code to a few lines that can reproduces the error That will be a great start Sorry for not being of much help hth marco On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta wrote: Hi Marco, Thanks for your response. Yes I tested it before & am able to load from linux filesystem and it also sometimes have similar issue. However in both cases (either from hadoop or linux file system), this error comes in some specific scenario as per my observations: 1. When two parallel spark separate application is initiated from one driver (not all the time, sometime)2. If one spark jobs are running for more than expected hour let say 2-3 hours, the second application terminated giving the error. To debug the problem for me it will be good if you can share some possible reasons why failed to broadcast error may come. Or if you need more logs I can share. Thanks again Spark User Group. Best RegardsPalash Gupta Sent from Yahoo Mail on Android On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni wrote: Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it successfully then your hadoop file is the issue and you can start debugging from there.Hth On 29 Dec 2016 6:26 am, "Palash Gupta" wrote: Hi Apache Spark User team, Greetings! I started developing an application using Apache Hadoop and Spark using python. My pyspark application randomly terminated saying "Failed to get broadcast_1*" and I have been searching for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application | | | | || | | | | | Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some CSV files from HDFS (... | | | | Could you please provide suggestion registering myself in Apache User list or how can I get suggestion or support to debug the problem I am facing? Your response will be highly appreciated. Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Nicholas, Appreciated your response. Understand your articulated point & I will implement and let you know the status of the problem. Sample: // these lines are equivalent in Spark 2.0 spark.read.format("csv").option("header", "true").load("../Downloads/*.csv") spark.read.option("header", "true").csv("../Downloads/*.csv") Thanks & Best Regards, Palash Gupta From: Nicholas Hakobian To: "spline_pal...@yahoo.com" Cc: Marco Mistroni ; User Sent: Thursday, December 29, 2016 10:39 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" If you are using spark 2.0 (as listed in the stackoverflow post) why are you using the external CSV module from Databricks? Spark 2.0 includes the functionality from this external module natively, and its possible you are mixing an older library with a newer spark which could explain a crash. Nicholas Szandor Hakobian, Ph.D.Senior Data ScientistRally healthnicholas.hakob...@rallyhealth.com On Thu, Dec 29, 2016 at 4:00 AM, Palash Gupta wrote: Hi Marco, Thanks for your response. Yes I tested it before & am able to load from linux filesystem and it also sometimes have similar issue. However in both cases (either from hadoop or linux file system), this error comes in some specific scenario as per my observations: 1. When two parallel spark separate application is initiated from one driver (not all the time, sometime)2. If one spark jobs are running for more than expected hour let say 2-3 hours, the second application terminated giving the error. To debug the problem for me it will be good if you can share some possible reasons why failed to broadcast error may come. Or if you need more logs I can share. Thanks again Spark User Group. Best RegardsPalash Gupta Sent from Yahoo Mail on Android On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni wrote: Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it successfully then your hadoop file is the issue and you can start debugging from there.Hth On 29 Dec 2016 6:26 am, "Palash Gupta" wrote: Hi Apache Spark User team, Greetings! I started developing an application using Apache Hadoop and Spark using python. My pyspark application randomly terminated saying "Failed to get broadcast_1*" and I have been searching for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application | | | | || | | | | | Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some CSV files from HDFS (... | | | | Could you please provide suggestion registering myself in Apache User list or how can I get suggestion or support to debug the problem I am facing? Your response will be highly appreciated. Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494
Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Marco, Thanks for your response. Yes I tested it before & am able to load from linux filesystem and it also sometimes have similar issue. However in both cases (either from hadoop or linux file system), this error comes in some specific scenario as per my observations: 1. When two parallel spark separate application is initiated from one driver (not all the time, sometime)2. If one spark jobs are running for more than expected hour let say 2-3 hours, the second application terminated giving the error. To debug the problem for me it will be good if you can share some possible reasons why failed to broadcast error may come. Or if you need more logs I can share. Thanks again Spark User Group. Best RegardsPalash Gupta Sent from Yahoo Mail on Android On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni wrote: Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it successfully then your hadoop file is the issue and you can start debugging from there.Hth On 29 Dec 2016 6:26 am, "Palash Gupta" wrote: Hi Apache Spark User team, Greetings! I started developing an application using Apache Hadoop and Spark using python. My pyspark application randomly terminated saying "Failed to get broadcast_1*" and I have been searching for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application | | | | || | | | | | Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some CSV files from HDFS (... | | | | Could you please provide suggestion registering myself in Apache User list or how can I get suggestion or support to debug the problem I am facing? Your response will be highly appreciated. Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494
[TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
Hi Apache Spark User team, Greetings! I started developing an application using Apache Hadoop and Spark using python. My pyspark application randomly terminated saying "Failed to get broadcast_1*" and I have been searching for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application | | | | || | | | | | Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some CSV files from HDFS (... | | | | Could you please provide suggestion registering myself in Apache User list or how can I get suggestion or support to debug the problem I am facing? Your response will be highly appreciated. Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494