Hi Macro, Yes it was in the same host when problem was found. Even when I tried to start with different host, the problem is still there. Any hints or suggestion will be appreciated. Thanks & Best Regards, Palash Gupta
From: Marco Mistroni <mmistr...@gmail.com> To: Palash Gupta <spline_pal...@yahoo.com> Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> Sent: Thursday, January 5, 2017 1:01 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi If it only happens when u run 2 app at same time could it be that these 2 apps somehow run on same host?Kr On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote: Hi Marco and respected member, I have done all the possible things suggested by Forum but still I'm having same issue: 1. I will migrate my applications to production environment where I will have more resourcesPalash>> I migrated my application in production where I have more CPU Cores, Memory & total 7 host in spark cluster. 2. Use Spark 2.0.0 function to load CSV rather using databrics apiPalash>> Earlier I'm using databricks csv api with Spark 2.0.0. As suggested by one of the mate, Now I'm using spark 2.0.0 built in csv loader. 3. In production I will run multiple spark application at a time and try to reproduce this error for both file system and HDFS loading casPalash>> yes I reproduced and it only happen when two spark application run at a time. Please see the logs: 17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 10.15.187.79): java.io.IOException: org.apache.spa rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1260) at org.apache.spark.broadcast. TorrentBroadcast. readBroadcastBlock( TorrentBroadcast.scala:174) at org.apache.spark.broadcast. TorrentBroadcast._value$ lzycompute(TorrentBroadcast. scala:65) at org.apache.spark.broadcast. TorrentBroadcast._value( TorrentBroadcast.scala:65) at org.apache.spark.broadcast. TorrentBroadcast.getValue( TorrentBroadcast.scala:89) at org.apache.spark.broadcast. Broadcast.value(Broadcast. scala:70) at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. scala:67) at org.apache.spark.scheduler. Task.run(Task.scala:85) at org.apache.spark.executor. Executor$TaskRunner.run( Executor.scala:274) at java.util.concurrent. ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1145) at java.util.concurrent. ThreadPoolExecutor$Worker.run( ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread. java:745) Caused by: org.apache.spark. SparkException: Failed to get broadcast_1_piece0 of broadcast_1 at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply$mcVI$s p(TorrentBroadcast.scala:146) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren tBroadcast.scala:125) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren tBroadcast.scala:125) at scala.collection.immutable. List.foreach(List.scala:381) at org.apache.spark.broadcast. TorrentBroadcast.org$apache$ spark$broadcast$ TorrentBroadcast$$readBlocks( TorrentBroadcast.scala: 125) at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$ readBroadcastBlock$1.apply( TorrentBroadcast.scala:186) at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: 1253) ... 11 more 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in stage 0.0 (TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 1 on executor id: 1 hostname: 10.15.187.78 . 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage 0.0 (TID 1) on executor 10.15.187.78: java.io.IOException (org .apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate 1] 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in stage 0.0 (TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 2 on executor id: 1 hostname: 10.15.187.78 . 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage 0.0 (TID 2) on executor 10.15.187.78: java.io.IOException (org .apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate 2] 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in stage 0.0 (TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes) 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ DriverEndpoint: Launching task 3 on executor id: 6 hostname: 10.15.187.76 . 17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3) on executor 10.15.187.76: java.io.IOException (org .apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1) [duplicate 3] 17/01/05 01:50:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 17/01/05 01:50:16 INFO scheduler.DAGScheduler: ResultStage 0 (load at NativeMethodAccessorImpl.java: -2) failed in 2.110 s 17/01/05 01:50:16 INFO scheduler.DAGScheduler: Job 0 failed: load at NativeMethodAccessorImpl.java: -2, took 2.262950 s Traceback (most recent call last): File "/home/hadoop/development/ datareloadwithps.py", line 851, in <module> datareporcessing(expected_ datetime,expected_directory_ hdfs) File "/home/hadoop/development/ datareloadwithps.py", line 204, in datareporcessing df_codingsc_raw = sqlContext.read.format("csv"). option("header",'true').load( HDFS_BASE_URL + hdfs_dir + filename) File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/ readwriter.py", line 147, in load File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/java_ gateway.py", line 933, in __call__ File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/utils. py", line 63, in deco File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/ protocol.py", line 312, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o58.load. Thanks & Best Regards, Palash Gupta From: Palash Gupta <spline_pal...@yahoo.com> To: Marco Mistroni <mmistr...@gmail.com> Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> Sent: Saturday, December 31, 2016 12:43 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi Marco, Thanks! Please have my response: so you have a pyspark application running on spark 2.0Palash>> Yes You have python scripts dropping files on HDFSPalash>> Yes (it is not part of spark process, just independent python script) then you have two spark jobPalash>> Yes - 1 load expected hour data (pls explain. HOw many files on average)Palash>> 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 4 total number of file: 28 - 1 load delayed data(pls explain. how many files on average)Palash>> We may or may not get delayed data in each hour. But for example disconnection between CSV generation system and spark system has a network issue then we will get many delayed hour files. On average: 35,000 rows in each file at least with 150 columns Number of CSV file types: 7 Number of file for each type: 2 total number of file: 14 Do these scripts run continuously (they have a while loop) or you kick them off via a job scheduler on an hourly basisPalash>> No this script is running in linux cron schedule (not in while loop). Do these scripts run on a cluster? Palash>> My pyspark application is running in a standalone cluster mode where I have only two VM (One master, two workers). So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation etc then populate mongo Palash>> Yes At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these files are not deleted). So your job now loads 5 files, does aggregation and store data in mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at T+1)? Palash>> No it will only handle with newly arrived file for new expected hour. But in delayed data handling there is a possibility to reprocess an specific hour data and re-calculate KPI and update in mongodb. You said before that simply parsing csv files via spark in a standalone app works fine. Palash>> I said that when I stopped delayed data loading spark script now expected hour data loading is smooth and running good since last three days. Then what you can try is to do exactly the same processig you are doing but instead of loading csv files from HDFS you can load from local directory and see if the problem persists......(this just to exclude any issues with loading HDFS data.)Palash>> The issue is same loading from file system. When I'm running only single script it is smooth. When I'm running both script at a time in two separate pyspark applications, sometimes it is failing showing this error while loading file from file system. Now I'm doing below things as per suggestion: 1. I will migrate my applications to production environment where I will have more resources2. Use Spark 2.0.0 function to load CSV rather using databrics api3. In production I will run multiple spark application at a time and try to reproduce this error for both file system and HDFS loading case When I'm done I will share details with you. If you have any suggestion more for debug point of view, you can add here for me Thanks & Best Regards, Palash Gupta From: Marco Mistroni <mmistr...@gmail.com> To: "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> Cc: ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> Sent: Saturday, December 31, 2016 1:42 AM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hi Palash so you have a pyspark application running on spark 2.0 You have python scripts dropping files on HDFS then you have two spark job - 1 load expected hour data (pls explain. HOw many files on average) - 1 load delayed data(pls explain. how many files on average) Do these scripts run continuously (they have a while loop) or you kick them off via a job scheduler on an hourly basis Do these scripts run on a cluster? So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 of them, does aggregation etc then populate mongo At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 additonal. Presumably these files are not deleted). So your job now loads 5 files, does aggregation and store data in mongodb? Or does your job at T+1 only loads deltas (the two new csv files which appeared at T+1)? You said before that simply parsing csv files via spark in a standalone app works fine. Then what you can try is to do exactly the same processig you are doing but instead of loading csv files from HDFS you can load from local directory and see if the problem persists......(this just to exclude any issues with loading HDFS data.) hth Marco On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_pal...@yahoo.com> wrote: Hi Marco & Ayan, I have now clearer idea about what Marco means by Reduce. I will do it to dig down. Let me answer to your queries: hen you see the broadcast errors, does your job terminate? Palash>> Yes it terminated the app. Or are you assuming that something is wrong just because you see the message in the logs? Palash>> No it terminated for the very first step of Spark processing (in my case loading csv from hdfs) Plus...Wrt logic....Who writes the CSV? With what frequency?Palash>> We parsed xml files using python (not in spark scope) & make csv and put in hdfs Does it app run all the time loading CSV from hadoop? Palash>> Every hour two separate pyspark app are running1. Loading current expected hour data, prepare kpi, do aggregation, load in mongodb2. Same operation will run for delayed hour data Are you using spark streaming?Palash>> No Does it app run fine with an older version of spark (1.6 )Palash>> I didn't test with Spark 1.6. My app is running now good as I stopped second app (delayed data loading) since last two days. Even most of the case both are running well except few times... Sent from Yahoo Mail on Android On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni<mmistr...@gmail.com> wrote: Correct. I mean reduce the functionality.Uhm I realised I didn't ask u a fundamental question. When you see the broadcast errors, does your job terminate? Or are you assuming that something is wrong just because you see the message in the logs?Plus...Wrt logic....Who writes the CSV? With what frequency?Does it app run all the time loading CSV from hadoop?Are you using spark streaming?Does it app run fine with an older version of spark (1.6 )Hth On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote: @Palash: I think what Macro meant by "reduce functionality" is to reduce scope of your application's functionality so that you can isolate the issue in certain part(s) of the app...I do not think he meant "reduce" operation :) On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com. invalid> wrote: Hi Marco, All of your suggestions are highly appreciated, whatever you said so far. I would apply to implement in my code and let you know. Let me answer your query: What does your program do? Palash>> In each hour I am loading many CSV files and then I'm making some KPI(s) out of them. Finally I am doing some aggregation and inserting into mongodb from spark. you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ?Palash>> Yes you are right whatever I'm processing it should not take much time. Initially my processing was taking only 5 minutes as I was using all cores running only one application. When I created more separate spark applications for handling delayed data loading and implementing more use cases with parallel run, I started facing the error randomly. And due to separate resource distribution among four parallel spark application to run in parallel now some task is taking longer time than usual. But still it should not take 2-3 hours time... Currently whole applications are running in a development environment where we have only two VM cluster and I will migrate to production platform by next week. I will let you know if there is any improvement over there. I'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. Palash>> Macro as I'm not very good in Spark. It would be helpful for me if you provide some example of reduce functionality. Cause I'm using Spark data frame, join data frames, use SQL statement to manipulate KPI(s). Here How could I apply reduce functionality? Thanks & Best Regards, Palash Gupta From: Marco Mistroni <mmistr...@gmail.com> To: "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> Cc: User <user@spark.apache.org> Sent: Thursday, December 29, 2016 11:28 PM Subject: Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" Hello no sorry i dont have any further insight into that.... i have seen similar errors but for completely different issues, and in most of hte cases it had to do with my data or my processing rather than Spark itself. What does your program do? you say it runs for 2-3 hours, what is the logic? just processing a huge amount of data? doing ML ? i'd say break down your application.. reduce functionality , run and see outcome. then add more functionality, run and see again. I found myself doing htese kinds of things when i got errors in my spark apps. To get a concrete help you will have to trim down the code to a few lines that can reproduces the error That will be a great start Sorry for not being of much help hth marco On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com> wrote: Hi Marco, Thanks for your response. Yes I tested it before & am able to load from linux filesystem and it also sometimes have similar issue. However in both cases (either from hadoop or linux file system), this error comes in some specific scenario as per my observations: 1. When two parallel spark separate application is initiated from one driver (not all the time, sometime)2. If one spark jobs are running for more than expected hour let say 2-3 hours, the second application terminated giving the error. To debug the problem for me it will be good if you can share some possible reasons why failed to broadcast error may come. Or if you need more logs I can share. Thanks again Spark User Group. Best RegardsPalash Gupta Sent from Yahoo Mail on Android On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni<mmistr...@gmail.com> wrote: Hi Pls try to read a CSV from filesystem instead of hadoop. If you can read it successfully then your hadoop file is the issue and you can start debugging from there.Hth On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_pal...@yahoo.com. invalid> wrote: Hi Apache Spark User team, Greetings! I started developing an application using Apache Hadoop and Spark using python. My pyspark application randomly terminated saying "Failed to get broadcast_1*" and I have been searching for suggestion and support in Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application | | | | | | | | | | | Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application I was building an application on Apache Spark 2.00 with Python 3.4 and trying to load some CSV files from HDFS (... | | | | Could you please provide suggestion registering myself in Apache User list or how can I get suggestion or support to debug the problem I am facing? Your response will be highly appreciated. Thanks & Best Regards, Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494 -- Best Regards, Ayan Guha