Re: Multiple Sources found for csv
sorry just found this which answers my question: https://stackoverflow.com/questions/41726340/spark-2-0-csv-error [https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded]<https://stackoverflow.com/questions/41726340/spark-2-0-csv-error> Spark 2.0 CSV Error<https://stackoverflow.com/questions/41726340/spark-2-0-csv-error> stackoverflow.com I am upgrading to spark 2 from 1.6 and am having an issue reading in CSV files. In spark 1.6 I would have something like this to read in a CSV file. val df = sqlContext.read.format("com.databricks... ____ From: jeff saremi <jeffsar...@hotmail.com> Sent: Tuesday, September 12, 2017 3:38:00 PM To: user@spark.apache.org Subject: Multiple Sources found for csv I have this line which works in the spark interactive console but it fails in Intellij Using Spark 2.1.1 in both cases: Exception in thread "main" java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name. source: val ie8df = sqlContext.read.schema(SomeSchema).option("mode","dropmalformed").option("sep", "\t").format("csv").load(somepath)
Multiple Sources found for csv
I have this line which works in the spark interactive console but it fails in Intellij Using Spark 2.1.1 in both cases: Exception in thread "main" java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name. source: val ie8df = sqlContext.read.schema(SomeSchema).option("mode","dropmalformed").option("sep", "\t").format("csv").load(somepath)
Re: Continue reading dataframe from file despite errors
thanks Suresh. it worked nicely From: Suresh Thalamati <suresh.thalam...@gmail.com> Sent: Tuesday, September 12, 2017 2:59:29 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Continue reading dataframe from file despite errors Try the CSV Option(“mode”, "dropmalformed”), that might skip the error records. On Sep 12, 2017, at 2:33 PM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: should have added some of the exception to be clear: 17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost, executor driver): java.lang.NumberFormatException: For input string: "south carolina" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250) ________ From: jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> Sent: Tuesday, September 12, 2017 2:32:03 PM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Continue reading dataframe from file despite errors I'm using a statement like the following to load my dataframe from some text file Upon encountering the first error, the whole thing throws an exception and processing stops. I'd like to continue loading even if that results in zero rows in my dataframe. How can i do that? thanks spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")
Re: Continue reading dataframe from file despite errors
should have added some of the exception to be clear: 17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost, executor driver): java.lang.NumberFormatException: For input string: "south carolina" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272) at scala.collection.immutable.StringOps.toInt(StringOps.scala:29) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:250) ____ From: jeff saremi <jeffsar...@hotmail.com> Sent: Tuesday, September 12, 2017 2:32:03 PM To: user@spark.apache.org Subject: Continue reading dataframe from file despite errors I'm using a statement like the following to load my dataframe from some text file Upon encountering the first error, the whole thing throws an exception and processing stops. I'd like to continue loading even if that results in zero rows in my dataframe. How can i do that? thanks spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")
Continue reading dataframe from file despite errors
I'm using a statement like the following to load my dataframe from some text file Upon encountering the first error, the whole thing throws an exception and processing stops. I'd like to continue loading even if that results in zero rows in my dataframe. How can i do that? thanks spark.read.schema(SomeSchema).option("sep", "\t").format("csv").load("somepath")
Re: How can i remove the need for calling cache
thanks Vadim. yes this is a good option for us. thanks From: Vadim Semenov <vadim.seme...@datadoghq.com> Sent: Wednesday, August 2, 2017 6:24:40 PM To: Suzen, Mehmet Cc: jeff saremi; user@spark.apache.org Subject: Re: How can i remove the need for calling cache So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have to create a new RDD that reads that data, this way you'll avoid recomputing the RDD but may lose time on saving/loading. Exactly same thing happens in 'checkpoint', 'checkpoint' is just a convenient method that gives you the same RDD back, basically. However, if your job fails, there's no way to run a new job using already 'checkpointed' data from a previous failed run. That's where having a custom check pointer helps. Another note: you can not delete "checkpoint"ed data in the same job, you need to delete it somehow else. BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local disk, making more space in JVM and letting you to avoid hdfs. On Wednesday, August 2, 2017, Vadim Semenov <vadim.seme...@datadoghq.com<mailto:vadim.seme...@datadoghq.com>> wrote: `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it just saves data to some destination. `cache/persist` allow you to cache data and keep the DAG in case of some executor that holds data goes down, so Spark would still be able to recalculate missing partitions `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the DAG, so if some executor goes down, the job will fail, because it has already forgotten the DAG. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610 and `checkpoint` allows you to save data to some shared storage and truncate the DAG, so if an executor goes down, the job will be able to take missing partitions from the place where it saved the RDD https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549 On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet <su...@acm.org<javascript:_e(%7B%7D,'cvml','su...@acm.org');>> wrote: On 3 August 2017 at 01:05, jeff saremi <jeffsar...@hotmail.com<javascript:_e(%7B%7D,'cvml','jeffsar...@hotmail.com');>> wrote: > Vadim: > > This is from the Mastering Spark book: > > "It is strongly recommended that a checkpointed RDD is persisted in memory, > otherwise saving it on a file will require recomputation." Is this really true? I had the impression that DAG will not be carried out once RDD is serialized to an external file, so 'saveAsObjectFile' saves DAG as well?
Re: How can i remove the need for calling cache
Vadim: This is from the Mastering Spark book: "It is strongly recommended that a checkpointed RDD is persisted in memory, otherwise saving it on a file will require recomputation." To me that means checkpoint will not prevent the recomputation that i was hoping for From: Vadim Semenov <vadim.seme...@datadoghq.com> Sent: Tuesday, August 1, 2017 12:05:17 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from HDFS and do map(op2… ``` On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
Thanks Mark. I'll examine the status more carefully to observe this. From: Mark Hamstra <m...@clearstorydata.com> Sent: Tuesday, August 1, 2017 11:25:46 AM To: user@spark.apache.org Subject: Re: How can i remove the need for calling cache Very likely, much of the potential duplication is already being avoided even without calling cache/persist. When running the above code without `myrdd.cache`, have you looked at the Spark web UI for the Jobs? For at least one of them you will likely see that many Stages are marked as "skipped", which means that prior shuffle files that cover the results of those Stages were still available, so Spark did not recompute those results. Spark will eventually clean up those shuffle files (unless you hold onto a reference to them), but if your Jobs using myrdd run fairly close together in time, then duplication is already minimized even without an explicit cache call. On Tue, Aug 1, 2017 at 11:05 AM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
Thanks Vadim. I'll try that From: Vadim Semenov <vadim.seme...@datadoghq.com> Sent: Tuesday, August 1, 2017 12:05:17 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from HDFS and do map(op2… ``` On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: How can i remove the need for calling cache
here are the threads that talk about problems we're experiencing. These problems exacerbate when we use cache/persist https://www.mail-archive.com/user@spark.apache.org/msg64987.html https://www.mail-archive.com/user@spark.apache.org/msg64986.html So I am looking for a way to reproduce the same effect as in my sample code without the use of cache(). If I use myrdd.count() would that be a good alternative? thanks From: lucas.g...@gmail.com <lucas.g...@gmail.com> Sent: Tuesday, August 1, 2017 11:23:04 AM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How can i remove the need for calling cache Hi Jeff, that looks sane to me. Do you have additional details? On 1 August 2017 at 11:05, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
How can i remove the need for calling cache
Calling cache/persist fails all our jobs (i have posted 2 threads on this). And we're giving up hope in finding a solution. So I'd like to find a workaround for that: If I save an RDD to hdfs and read it back, can I use it in more than one operation? Example: (using cache) // do a whole bunch of transformations on an RDD myrdd.cache() val result1 = myrdd.map(op1(_)) val result2 = myrdd.map(op2(_)) // in the above I am assuming that a call to cache will prevent all previous transformation from being calculated twice I'd like to somehow get result1 and result2 without duplicating work. How can I do that? thanks Jeff
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
asking this on a tangent: Is there anyway for the shuffle data to be replicated to more than one server? thanks From: jeff saremi <jeffsar...@hotmail.com> Sent: Friday, July 28, 2017 4:38:08 PM To: Juan Rodríguez Hortalá Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Thanks Juan for taking the time Here's more info: - This is running on Yarn in Master mode - See config params below - This is a corporate environment. In general nodes should not be added or removed that often to the cluster. Even if that is the case I would expect that to be one or 2 servers. In my case I get hundreds of these errors before the job fails. --master yarn-cluster ^ --driver-memory 96G ^ --executor-memory 48G ^ --num-executors 150 ^ --executor-cores 8 ^ --driver-cores 8 ^ --conf spark.yarn.executor.memoryOverhead=36000 ^ --conf spark.shuffle.service.enabled=true ^ --conf spark.yarn.submit.waitAppCompletion=false ^ --conf spark.yarn.submit.file.replication=64 ^ --conf spark.yarn.maxAppAttempts=1 ^ --conf spark.speculation=true ^ --conf spark.speculation.quantile=0.9 ^ --conf spark.yarn.executor.nodeLabelExpression="prod" ^ --conf spark.yarn.am.nodeLabelExpression="prod" ^ --conf spark.stage.maxConsecutiveAttempts=1000 ^ --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^ --conf spark.yarn.launchContainer.count.simultaneously=50 ^ --conf spark.driver.maxResultSize=16G ^ --conf spark.network.timeout=1000s ^ From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> Sent: Friday, July 28, 2017 4:20:40 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Hi Jeff, Can you provide more information about how are you running your job? In particular: - which cluster manager are you using? It is YARN, Mesos, Spark Standalone? - with configuration options are you using to submit the job? In particular are you using dynamic allocation or external shuffle? You should be able to see this in the Environment tab of the Spark UI, looking for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. - in which environment are you running the jobs? Is this an on premise cluster or some cloud provider? Are you adding or removing nodes from the cluster during the job execution? FetchFailedException errors happen during execution when an executor is not able to read the shuffle blocks for a previous stage that are served by other executor. That might happen if the executor that has to serve the files dies and internal shuffle is used, although there can be other reasons like network errors. If you are using dynamic allocation then you should also enable external shuffle service so shuffle blocks can be served by the node manager after the executor that created the blocks is terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation for more details. On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
Thanks Juan for taking the time Here's more info: - This is running on Yarn in Master mode - See config params below - This is a corporate environment. In general nodes should not be added or removed that often to the cluster. Even if that is the case I would expect that to be one or 2 servers. In my case I get hundreds of these errors before the job fails. --master yarn-cluster ^ --driver-memory 96G ^ --executor-memory 48G ^ --num-executors 150 ^ --executor-cores 8 ^ --driver-cores 8 ^ --conf spark.yarn.executor.memoryOverhead=36000 ^ --conf spark.shuffle.service.enabled=true ^ --conf spark.yarn.submit.waitAppCompletion=false ^ --conf spark.yarn.submit.file.replication=64 ^ --conf spark.yarn.maxAppAttempts=1 ^ --conf spark.speculation=true ^ --conf spark.speculation.quantile=0.9 ^ --conf spark.yarn.executor.nodeLabelExpression="prod" ^ --conf spark.yarn.am.nodeLabelExpression="prod" ^ --conf spark.stage.maxConsecutiveAttempts=1000 ^ --conf spark.yarn.scheduler.heartbeat.interval-ms=15000 ^ --conf spark.yarn.launchContainer.count.simultaneously=50 ^ --conf spark.driver.maxResultSize=16G ^ --conf spark.network.timeout=1000s ^ From: Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com> Sent: Friday, July 28, 2017 4:20:40 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232 Hi Jeff, Can you provide more information about how are you running your job? In particular: - which cluster manager are you using? It is YARN, Mesos, Spark Standalone? - with configuration options are you using to submit the job? In particular are you using dynamic allocation or external shuffle? You should be able to see this in the Environment tab of the Spark UI, looking for spark.dynamicAllocation.enabled and spark.shuffle.service.enabled. - in which environment are you running the jobs? Is this an on premise cluster or some cloud provider? Are you adding or removing nodes from the cluster during the job execution? FetchFailedException errors happen during execution when an executor is not able to read the shuffle blocks for a previous stage that are served by other executor. That might happen if the executor that has to serve the files dies and internal shuffle is used, although there can be other reasons like network errors. If you are using dynamic allocation then you should also enable external shuffle service so shuffle blocks can be served by the node manager after the executor that created the blocks is terminated, see https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation for more details. On Fri, Jul 28, 2017 at 9:57 AM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: How to configure spark on Yarn cluster
The only relevant setting i see in Yarn is this: yarn.nodemanager.resource.memory-mb 120726 which is 120GB and we are well below that. I don't see a total limit. I haven't played with spark.memory.fraction. I'm not sure if it makes a difference. Note that there are no errors coming from Spark with respect to memory being an issue. Yarn kills the JVM and just prints out one line: Out of memory in the stdout of the container. After that Spark complains about the ExecutorLostFailure. So the memory factions are not playing a factor here. I just looked at the link you included. Thank you. Yes this is the same problem however it looks like no one has come up with a solution for this problem yet From: yohann jardin <yohannjar...@hotmail.com> Sent: Friday, July 28, 2017 10:47:40 AM To: jeff saremi; user@spark.apache.org Subject: Re: How to configure spark on Yarn cluster Not sure that we are OK on one thing: Yarn limitations are for the sum of all nodes, while you only specify the memory for a single node through Spark. By the way, the memory displayed in the UI is only a part of the total memory allocation: https://spark.apache.org/docs/latest/configuration.html#memory-management It corresponds to “spark.memory.fraction”, so it will mainly be filled by the rdd you’re trying to persist. The memory left by this parameter will be used to read the input file and compute. When the fail comes from this, the Out Of Memory exception is quite explicit in the driver logs. Testing sampled files of 1 GB, 1 TB, 10 TB should help investigate what goes right and what goes wrong at least a bit. Also, did you check for similar issues on stackoverflow? Like https://stackoverflow.com/questions/40781354/container-killed-by-yarn-for-exceeding-memory-limits-10-4-gb-of-10-4-gb-physic Regards, Yohann Jardin Le 7/28/2017 à 6:05 PM, jeff saremi a écrit : Thanks so much Yohann I checked the Storage/Memory column in Executors status page. Well below where I wanted to be. I will try the suggestion on smaller data sets. I am also well within the Yarn limitations (128GB). In my last try I asked for 48+32 (overhead). So somehow I am exceeding that or I should say Spark is exceeding since I am trusting to manage the memory I provided for it. Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs that I should be looking for to come up with the recommended memory size or partition count? thanks From: yohann jardin <yohannjar...@hotmail.com><mailto:yohannjar...@hotmail.com> Sent: Thursday, July 27, 2017 11:15:39 PM To: jeff saremi; user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: How to configure spark on Yarn cluster Check the executor page of the Spark UI, to check if your storage level is limiting. Also, instead of starting with 100 TB of data, sample it, make it work, and grow it little by little until you reached 100 TB. This will validate the workflow and let you see how much data is shuffled, etc. And just in case, check the limits you set on your Yarn queue. If you try to allocate more memory to your job than what is set on the queue, there might be cases of failure. Though there are some limitations, it’s possible to allocate more ram to your job than available on your Yarn queue. Yohann Jardin Le 7/28/2017 à 8:03 AM, jeff saremi a écrit : I have the simplest job which i'm running against 100TB of data. The job keeps failing with ExecutorLostFailure's on containers killed by Yarn for exceeding memory limits I have varied the executor-memory from 32GB to 96GB, the spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to the number of cores, and driver size. It looks like nothing stops this error (running out of memory) from happening. Looking at metrics reported by Spark status page, is there anything I can use to configure my job properly? Is repartitioning more or less going to help at all? The current number of partitions is around 40,000 currently. Here's the gist of the code: val input = sc.textFile(path) val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a))) t0.persist(StorageLevel.DISK_ONLY) I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to no avail.
Job keeps aborting because of org.apache.spark.shuffle.FetchFailedException: Failed to connect to server/ip:39232
We have a not too complex and not too large spark job that keeps dying with this error I have researched it and I have not seen any convincing explanation on why I am not using a shuffle service. Which server is the one that is refusing the connection? If I go to the server that is being reported in the error message, I see a lot of these errors towards the end: java.io.FileNotFoundException: D:\data\yarnnm\local\usercache\hadoop\appcache\application_1500970459432_1024\blockmgr-7f3a1abc-2b8b-4e51-9072-8c12495ec563\0e\shuffle_0_4107_0.index (may or may not be related to the problem at all) and if you examine further on this machine there are fetchfailedexceptions resulting from other machines and so on and so forth This is Spark 1.6 on Yarn-master Could anyone provide some insight or solution to this? thanks
Re: How to configure spark on Yarn cluster
Thanks so much Yohann I checked the Storage/Memory column in Executors status page. Well below where I wanted to be. I will try the suggestion on smaller data sets. I am also well within the Yarn limitations (128GB). In my last try I asked for 48+32 (overhead). So somehow I am exceeding that or I should say Spark is exceeding since I am trusting to manage the memory I provided for it. Is there anything in Shuffle Write Size, Shuffle Spill, or anything in the logs that I should be looking for to come up with the recommended memory size or partition count? thanks From: yohann jardin <yohannjar...@hotmail.com> Sent: Thursday, July 27, 2017 11:15:39 PM To: jeff saremi; user@spark.apache.org Subject: Re: How to configure spark on Yarn cluster Check the executor page of the Spark UI, to check if your storage level is limiting. Also, instead of starting with 100 TB of data, sample it, make it work, and grow it little by little until you reached 100 TB. This will validate the workflow and let you see how much data is shuffled, etc. And just in case, check the limits you set on your Yarn queue. If you try to allocate more memory to your job than what is set on the queue, there might be cases of failure. Though there are some limitations, it’s possible to allocate more ram to your job than available on your Yarn queue. Yohann Jardin Le 7/28/2017 à 8:03 AM, jeff saremi a écrit : I have the simplest job which i'm running against 100TB of data. The job keeps failing with ExecutorLostFailure's on containers killed by Yarn for exceeding memory limits I have varied the executor-memory from 32GB to 96GB, the spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to the number of cores, and driver size. It looks like nothing stops this error (running out of memory) from happening. Looking at metrics reported by Spark status page, is there anything I can use to configure my job properly? Is repartitioning more or less going to help at all? The current number of partitions is around 40,000 currently. Here's the gist of the code: val input = sc.textFile(path) val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a))) t0.persist(StorageLevel.DISK_ONLY) I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to no avail.
How to configure spark on Yarn cluster
I have the simplest job which i'm running against 100TB of data. The job keeps failing with ExecutorLostFailure's on containers killed by Yarn for exceeding memory limits I have varied the executor-memory from 32GB to 96GB, the spark.yarn.executor.memoryOverhead from 8192 to 36000 and similar changes to the number of cores, and driver size. It looks like nothing stops this error (running out of memory) from happening. Looking at metrics reported by Spark status page, is there anything I can use to configure my job properly? Is repartitioning more or less going to help at all? The current number of partitions is around 40,000 currently. Here's the gist of the code: val input = sc.textFile(path) val t0 = input.map(s => s.split("\t").map(a => ((a(0),a(1)), a))) t0.persist(StorageLevel.DISK_ONLY) I have changed storagelevel from MEMORY_ONLY to MEMORY_AND_DISK to DISK_ONLY to no avail.
Re: How to list only erros for a stage
Thank you. That helps From: 周康 <zhoukang199...@gmail.com> Sent: Monday, July 24, 2017 8:04:51 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: How to list only erros for a stage May be you can click Header Status cloumn of Task section,then failed task will appear first. 2017-07-25 10:02 GMT+08:00 jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>>: On the Spark status UI you can click Stages on the menu and see Active (and completed stages). For the active stage, you can see Succeeded/Total and a count of failed ones in paranthesis. I'm looking for a way to go straight to the failed tasks and list the errors. Currently I must go into details on that stage, then scroll down to Tasks section. Change the number of records per page so I can see everything. and click Go. There is no way that I can just filter the ones with errors thanks jeff
How to list only erros for a stage
On the Spark status UI you can click Stages on the menu and see Active (and completed stages). For the active stage, you can see Succeeded/Total and a count of failed ones in paranthesis. I'm looking for a way to go straight to the failed tasks and list the errors. Currently I must go into details on that stage, then scroll down to Tasks section. Change the number of records per page so I can see everything. and click Go. There is no way that I can just filter the ones with errors thanks jeff
Re: Is there "EXCEPT ALL" in Spark SQL?
EXCEPT is not the same as EXCEPT ALL Had they implemented EXCEPT ALL in SparkSQL one could have easily obtained EXCEPT by adding a disctint() to the results From: hareesh makam <makamhare...@gmail.com> Sent: Thursday, July 6, 2017 12:48:18 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Is there "EXCEPT ALL" in Spark SQL? There is Except in DataFrame API. df1.except(df2) Same can be used in SQL as well. public DataFrame<https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html> except(DataFrame<https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html> other) Returns a new DataFrame<https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html> containing rows in this frame but not in another frame. This is equivalent to EXCEPT in SQL. -Hareesh On 6 July 2017 at 12:22, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: I tried this query in 1.6 and it failed: SELECT * FROM Table1 EXCEPT ALL SELECT * FROM Table2 Exception in thread "main" java.lang.RuntimeException: [1.32] failure: ``('' expected but `all' found thanks Jeff
Is there "EXCEPT ALL" in Spark SQL?
I tried this query in 1.6 and it failed: SELECT * FROM Table1 EXCEPT ALL SELECT * FROM Table2 Exception in thread "main" java.lang.RuntimeException: [1.32] failure: ``('' expected but `all' found thanks Jeff
Re: What is the equivalent of mapPartitions in SpqrkSQL?
I have to read up on the writer. But would the writer get records back from somewhere? I want to do a bulk operation and continue with the results in the form of a dataframe. Currently the UDF does this: 1 scalar -> 1 scalar the UDAF does this: M records -> 1 scalar I want this: M records -> M records (or M scalars) or in the broadest sense: M records -> N records I think this capability is left out of sparksql forcing us to go back to spark core using map*, groupby*, and reduceby* functions and alike Being forced to keep converting between sql and non-sql is very annoying as such forcing us to stay conservative and just make do without sql. I'm sure we're not alone here. From: Aaron Perrin <aper...@gravyanalytics.com> Sent: Tuesday, June 27, 2017 4:50:25 PM To: Ryan; jeff saremi Cc: user@spark.apache.org Subject: Re: What is the equivalent of mapPartitions in SpqrkSQL? I'm assuming some things here, but hopefully I understand. So, basically you have a big table of data distributed across a bunch of executors. And, you want an efficient way to call a native method for each row. It sounds similar to a dataframe writer to me. Except, instead of writing to disk or network, you're 'writing' to a native function. Would a custom dataframe writer work? That's what I'd try first. If that doesn't work for your case, you could also try adding a column where the column function does the native call. However, if doing it that way, you'd have to ensure that the column function actually gets called for all rows. (An interesting side effect of that is that you could JNI/WinAPI errors there and set the column value to the result.) There are other ways, too, if those options don't work... On Sun, Jun 25, 2017 at 8:07 PM jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: My specific and immediate need is this: We have a native function wrapped in JNI. To increase performance we'd like to avoid calling it record by record. mapPartitions() give us the ability to invoke this in bulk. We're looking for a similar approach in SQL. From: Ryan <ryan.hd@gmail.com<mailto:ryan.hd@gmail.com>> Sent: Sunday, June 25, 2017 7:18:32 PM To: jeff saremi Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: What is the equivalent of mapPartitions in SpqrkSQL? Why would you like to do so? I think there's no need for us to explicitly ask for a forEachPartition in spark sql because tungsten is smart enough to figure out whether a sql operation could be applied on each partition or there has to be a shuffle. On Sun, Jun 25, 2017 at 11:32 PM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: You can do a map() using a select and functions/UDFs. But how do you process a partition using SQL?
Re: What is the equivalent of mapPartitions in SpqrkSQL?
My specific and immediate need is this: We have a native function wrapped in JNI. To increase performance we'd like to avoid calling it record by record. mapPartitions() give us the ability to invoke this in bulk. We're looking for a similar approach in SQL. From: Ryan <ryan.hd@gmail.com> Sent: Sunday, June 25, 2017 7:18:32 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: What is the equivalent of mapPartitions in SpqrkSQL? Why would you like to do so? I think there's no need for us to explicitly ask for a forEachPartition in spark sql because tungsten is smart enough to figure out whether a sql operation could be applied on each partition or there has to be a shuffle. On Sun, Jun 25, 2017 at 11:32 PM, jeff saremi <jeffsar...@hotmail.com<mailto:jeffsar...@hotmail.com>> wrote: You can do a map() using a select and functions/UDFs. But how do you process a partition using SQL?
What is the equivalent of mapPartitions in SpqrkSQL?
You can do a map() using a select and functions/UDFs. But how do you process a partition using SQL?
Re: Bizzare diff in behavior between scala REPL and sparkSQL UDF
never mind! I has a space at the end of my data which was not showing up in manual testing. thanks From: jeff saremi <jeffsar...@hotmail.com> Sent: Tuesday, June 20, 2017 2:48:06 PM To: user@spark.apache.org Subject: Bizzare diff in behavior between scala REPL and sparkSQL UDF I have this function which does a regex matching in scala. I test it in the REPL I get expected results. I use it as a UDF in sparkSQL i get completely incorrect results. Function: class UrlFilter (filters: Seq[String]) extends Serializable { val regexFilters = filters.map(new Regex(_)) regexFilters.foreach(println) def matches(s: String) : Boolean = { if(s == null || s.isEmpty) return false regexFilters.exists(f => {print("matching " + f + " against " + s); s match { case f() => { println("; matched! returning true"); true }; case _ => { println("; did NOT match. returning false"); false } }}) } } Instantiating it with a pattern like: ^[^:]+://[^.]*\.company[0-9]*9\.com$ (matches a url that has company in the name and a number that ends in digit 9) Test it in Scala REPL: scala> val filters = Source.fromFile("D:\\cosmos-modules\\testdata\\fakefilters.txt").getLines.toList scala> val urlFilter = new UrlFilter(filters) scala> urlFilter.matches("ftp://ftp.company9.com;) matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com; matched! returning true res2: Boolean = true Use it in SparkSQL: val urlFilter = new UrlFilter(filters) sqlContext.udf.register("filterListMatch", (url: String) => urlFilter.matches(url)) val nonMatchingUrlsDf = sqlContext.sql("SELECT url FROM distinctUrls WHERE NOT filterListMatch(url)") Look at the debug prints in the console: matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com ; did NOT match. returning false I have repeated this several times to make sure I'm comparing apples only I am using Spark 1.6 and Scala 2.10.5 with Java 1.8 thanks
Bizzare diff in behavior between scala REPL and sparkSQL UDF
I have this function which does a regex matching in scala. I test it in the REPL I get expected results. I use it as a UDF in sparkSQL i get completely incorrect results. Function: class UrlFilter (filters: Seq[String]) extends Serializable { val regexFilters = filters.map(new Regex(_)) regexFilters.foreach(println) def matches(s: String) : Boolean = { if(s == null || s.isEmpty) return false regexFilters.exists(f => {print("matching " + f + " against " + s); s match { case f() => { println("; matched! returning true"); true }; case _ => { println("; did NOT match. returning false"); false } }}) } } Instantiating it with a pattern like: ^[^:]+://[^.]*\.company[0-9]*9\.com$ (matches a url that has company in the name and a number that ends in digit 9) Test it in Scala REPL: scala> val filters = Source.fromFile("D:\\cosmos-modules\\testdata\\fakefilters.txt").getLines.toList scala> val urlFilter = new UrlFilter(filters) scala> urlFilter.matches("ftp://ftp.company9.com;) matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com; matched! returning true res2: Boolean = true Use it in SparkSQL: val urlFilter = new UrlFilter(filters) sqlContext.udf.register("filterListMatch", (url: String) => urlFilter.matches(url)) val nonMatchingUrlsDf = sqlContext.sql("SELECT url FROM distinctUrls WHERE NOT filterListMatch(url)") Look at the debug prints in the console: matching ^[^:]+://[^.]*\.company[0-9]*9\.com$ against ftp://ftp.company9.com ; did NOT match. returning false I have repeated this several times to make sure I'm comparing apples only I am using Spark 1.6 and Scala 2.10.5 with Java 1.8 thanks
Re: Spark-submit: where do --files go?
Thanks Sidney From: Sidney Feiner <sidney.fei...@startapp.com> Sent: Thursday, January 19, 2017 9:52 AM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Spark-submit: where do --files go? Every executor creates a directory with your submitted files and you can access every file's absolute path them with the following: val fullFilePath = SparkFiles.get(fileName) On Jan 19, 2017 19:35, jeff saremi <jeffsar...@hotmail.com> wrote: I'd like to know how -- From within Java/spark -- I can access the dependent files which i deploy using "--files" option on the command line?
Re: Spark-submit: where do --files go?
i wish someone added this to the documentation From: jeff saremi <jeffsar...@hotmail.com> Sent: Thursday, January 19, 2017 9:56 AM To: Sidney Feiner Cc: user@spark.apache.org Subject: Re: Spark-submit: where do --files go? Thanks Sidney From: Sidney Feiner <sidney.fei...@startapp.com> Sent: Thursday, January 19, 2017 9:52 AM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Spark-submit: where do --files go? Every executor creates a directory with your submitted files and you can access every file's absolute path them with the following: val fullFilePath = SparkFiles.get(fileName) On Jan 19, 2017 19:35, jeff saremi <jeffsar...@hotmail.com> wrote: I'd like to know how -- From within Java/spark -- I can access the dependent files which i deploy using "--files" option on the command line?
Spark-submit: where do --files go?
I'd like to know how -- From within Java/spark -- I can access the dependent files which i deploy using "--files" option on the command line?
RE: Access to broadcasted variable
Is the broadcasted variable distributed to every executor or every worker? Now i'm more confused I thought it was supposed to save memory by distributing it to every worker and the executors would share that copy Date: Fri, 19 Feb 2016 16:48:59 -0800 Subject: Re: Access to broadcasted variable From: shixi...@databricks.com To: jeffsar...@hotmail.com CC: user@spark.apache.org The broadcasted object is serialized in driver and sent to the executors. And in the executor, it will deserialize the bytes to get the broadcasted object. On Fri, Feb 19, 2016 at 5:54 AM, jeff saremi <jeffsar...@hotmail.com> wrote: could someone please comment on this? thanks From: jeffsar...@hotmail.com To: user@spark.apache.org Subject: Access to broadcasted variable Date: Thu, 18 Feb 2016 14:44:07 -0500 I'd like to know if the broadcasted object gets serialized when accessed by the executor during the execution of a task? I know that it gets serialized from the driver to the worker. This question is inside worker when executor JVM's are accessing it thanks Jeff
RE: Access to broadcasted variable
could someone please comment on this? thanks From: jeffsar...@hotmail.com To: user@spark.apache.org Subject: Access to broadcasted variable Date: Thu, 18 Feb 2016 14:44:07 -0500 I'd like to know if the broadcasted object gets serialized when accessed by the executor during the execution of a task? I know that it gets serialized from the driver to the worker. This question is inside worker when executor JVM's are accessing it thanks Jeff
Access to broadcasted variable
I'd like to know if the broadcasted object gets serialized when accessed by the executor during the execution of a task? I know that it gets serialized from the driver to the worker. This question is inside worker when executor JVM's are accessing it thanks Jeff
RE: SequenceFile and object reuse
Sandy, Ryan, Andrew Thanks very much. I think i now understand it better. Jeff From: ryan.blake.willi...@gmail.com Date: Thu, 19 Nov 2015 06:00:30 + Subject: Re: SequenceFile and object reuse To: sandy.r...@cloudera.com; jeffsar...@hotmail.com CC: user@spark.apache.org Hey Jeff, in addition to what Sandy said, there are two more reasons that this might not be as bad as it seems; I may be incorrect in my understanding though. First, the "additional step" you're referring to is not likely to be adding any overhead; the "extra map" is really just materializing the data once (as opposed to zero times), which is what you want (assuming your access pattern couldn't be reformulated in the way Sandy described, i.e. where all the objects in a partition don't need to be in memory at the same time). Secondly, even if this was an "extra" map step, it wouldn't add any extra stages to a given pipeline, being a "narrow" dependency, so it would likely be low-cost anyway. Let me know if any of the above seems incorrect, thanks! On Thu, Nov 19, 2015 at 12:41 AM Sandy Ryza <sandy.r...@cloudera.com> wrote: Hi Jeff, Many access patterns simply take the result of hadoopFile and use it to create some other object, and thus have no need for each input record to refer to a different object. In those cases, the current API is more performant than an alternative that would create an object for each record, because it avoids the unnecessary overhead of creating Java objects. As you've pointed out, this is at the expense of making the code more verbose when caching. -Sandy On Fri, Nov 13, 2015 at 10:29 AM, jeff saremi <jeffsar...@hotmail.com> wrote: So we tried reading a sequencefile in Spark and realized that all our records have ended up becoming the same. THen one of us found this: Note: Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. Is there anyone that can shed some light on this bizzare behavior and the decisions behind it? And I also would like to know if anyone's able to read a binary file and not to incur the additional map() as suggested by the above? What format did you use? thanksJeff
SequenceFile and object reuse
So we tried reading a sequencefile in Spark and realized that all our records have ended up becoming the same. THen one of us found this: Note: Because Hadoop's RecordReader class re-uses the same Writable object for each record, directly caching the returned RDD or directly passing it to an aggregation or shuffle operation will create many references to the same object. If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first copy them using a map function. Is there anyone that can shed some light on this bizzare behavior and the decisions behind it? And I also would like to know if anyone's able to read a binary file and not to incur the additional map() as suggested by the above? What format did you use? thanksJeff
RE: How to install a Spark Package?
yes those.Here's for example Avro's: spark-shell --packages com.databricks:spark-avro_2.10:2.0.1The way i read this is that this line would instruct Spark to go and get the package. But does that mean installation? Is this permament? do I need to specify it one? or each time? Will this be downloaded each time? What if the environment that i'm running my Spark in does no allow such connection? If i have downloaded these packages ist there a way to install them permanently? Subject: Re: How to install a Spark Package? From: yuzhih...@gmail.com Date: Sun, 4 Oct 2015 21:05:44 -0700 CC: user@spark.apache.org To: jeffsar...@hotmail.com Are you talking about package which is listed onhttp://spark-packages.org The package should come with installation instructions, right ? On Oct 4, 2015, at 8:55 PM, jeff saremi <jeffsar...@hotmail.com> wrote: So that it is available even in offline mode? I can't seem to be able to find any notes on thatthanksjeff
How to install a Spark Package?
So that it is available even in offline mode? I can't seem to be able to find any notes on thatthanksjeff
How to make sense of Spark log entries
There are executor logs and driver logs. Most of them are not intuitive enough to mean anything to us. Are there any notes, documents, talks on how to decipher these logs and troubleshoot our applications' performance as a result? thanks Jeff
pyspark question: create RDD from csr_matrix
i've tried desperately to create an RDD from a matrix i have. Every combination failed. I have a sparse matrix returned from a call to dv = DictVectorizer()sv_tf = dv.fit_transform(tf) which is supposed to be a matrix of document terms and their frequencies. I need to convert this to an RDD so I can feed it to pyspark functions such as IDF().fit() I tried applying a Vectors.sparse(??, sv_tf) but i didn't know what the dimension should be I tried doing a sc.parallelize(sv_tf) which didn't work either I tried both above methods with sv_tf.toarray(). Again no luck thanks Jeff