Re: Spark Number of Partitions Recommendations
Yes, I forgot to mention I chose prime number as a modulo for hash function because my keys are usually strings and spark calculates particular partitiion using key hash(see HashPartitioner.scala) So, to avoid big number of collisions(when many keys located in few partition) it is common to use prime number in modulo. But it makes sense only for String keys offcourse, because of hash function. If yuo have different hash function for key of different type you can use any other modulo instead prime number. I like this discussion on this topic http://stackoverflow.com/questions/1145217/why-should-hash-functions-use-a-prime-number-modulus -- Яндекс.Почта — надёжная почта http://mail.yandex.ru/neo2/collect/?exp=1&t=1 02.08.2015, 00:14, "Ruslan Dautkhanov" : > You should also take into account amount of memory that you plan to use. > It's advised not to give too much memory for each executor .. otherwise GC > overhead will go up. > > Btw, why prime numbers? > > -- > Ruslan Dautkhanov > > On Wed, Jul 29, 2015 at 3:31 AM, ponkin wrote: >> Hi Rahul, >> >> Where did you see such a recommendation? >> I personally define partitions with the following formula >> >> partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores ) ) >> >> where >> nextPrimeNumberAbove(x) - prime number which is greater than x >> K - multiplicator to calculate start with 1 and encrease untill join >> perfomance start to degrade >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html >> >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TCP/IP speedup
If your network is bandwidth-bound, you'll see setting jumbo frames (MTU 9000) may increase bandwidth up to ~20%. http://docs.hortonworks.com/HDP2Alpha/index.htm#Hardware_Recommendations_for_Hadoop.htm "Enabling Jumbo Frames across the cluster improves bandwidth" If Spark workload is not network bandwidth-bound, I can see it'll be a few percent to no improvement. -- Ruslan Dautkhanov On Sat, Aug 1, 2015 at 6:08 PM, Simon Edelhaus wrote: > H > > 2% huh. > > > -- ttfn > Simon Edelhaus > California 2015 > > On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra > wrote: > >> https://spark-summit.org/2015/events/making-sense-of-spark-performance/ >> >> On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus wrote: >> >>> Hi All! >>> >>> How important would be a significant performance improvement to TCP/IP >>> itself, in terms of >>> overall job performance improvement. Which part would be most >>> significantly accelerated? >>> Would it be HDFS? >>> >>> -- ttfn >>> Simon Edelhaus >>> California 2015 >>> >> >> >
Re: TCP/IP speedup
H 2% huh. -- ttfn Simon Edelhaus California 2015 On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra wrote: > https://spark-summit.org/2015/events/making-sense-of-spark-performance/ > > On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus wrote: > >> Hi All! >> >> How important would be a significant performance improvement to TCP/IP >> itself, in terms of >> overall job performance improvement. Which part would be most >> significantly accelerated? >> Would it be HDFS? >> >> -- ttfn >> Simon Edelhaus >> California 2015 >> > >
Re: TCP/IP speedup
https://spark-summit.org/2015/events/making-sense-of-spark-performance/ On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus wrote: > Hi All! > > How important would be a significant performance improvement to TCP/IP > itself, in terms of > overall job performance improvement. Which part would be most > significantly accelerated? > Would it be HDFS? > > -- ttfn > Simon Edelhaus > California 2015 >
TCP/IP speedup
Hi All! How important would be a significant performance improvement to TCP/IP itself, in terms of overall job performance improvement. Which part would be most significantly accelerated? Would it be HDFS? -- ttfn Simon Edelhaus California 2015
Re: How does the # of tasks affect # of threads?
1. I believe that the default memory (per executor) is 512m (from the documentation) 2. I have increased the memory used by spark on workers in my launch script when submitting the job (--executor-memory 124g) 3. The job completes successfully, it is the "road bumps" in the middle I am concerned with I would like insight into how Spark handle thread creation On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman wrote: > Hello, > > I am not an expert with Spark, but the error thrown by spark seems > indicate that not enough memory for launching job. By default, Spark > allocated 1GB for memory, may be you should increase it ? > > Best regards > > Fabrice > > Le sam. 1 août 2015 à 22:51, Connor Zanin a écrit : > >> Hello, >> >> I am having an issue when I run a word count job. I have included the >> source and log files for reference. The job finishes successfully, but >> about halfway through I get a java.lang.OutOfMemoryError (could not create >> native thread), and this leads to the loss of the Executor. After some >> searching I found out this was a problem with the environment and the limit >> by the OS on how many threads I could spawn. >> >> However, I had thought that Spark only maintained a thread pool equal in >> size to the number of cores available across the nodes (by default), and >> schedules tasks dynamically as threads become available. The only Spark >> parameter I change is the number of partitions in my RDD. >> >> My question is, how is Spark deciding how many threads to spawn and when? >> >> -- >> Regards, >> >> Connor Zanin >> Computer Science >> University of Delaware >> >> >> >> -- >> Regards, >> >> Connor Zanin >> Computer Science >> University of Delaware >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org > > -- Regards, Connor Zanin Computer Science University of Delaware
Re: How does the # of tasks affect # of threads?
Hello, I am not an expert with Spark, but the error thrown by spark seems indicate that not enough memory for launching job. By default, Spark allocated 1GB for memory, may be you should increase it ? Best regards Fabrice Le sam. 1 août 2015 à 22:51, Connor Zanin a écrit : > Hello, > > I am having an issue when I run a word count job. I have included the > source and log files for reference. The job finishes successfully, but > about halfway through I get a java.lang.OutOfMemoryError (could not create > native thread), and this leads to the loss of the Executor. After some > searching I found out this was a problem with the environment and the limit > by the OS on how many threads I could spawn. > > However, I had thought that Spark only maintained a thread pool equal in > size to the number of cores available across the nodes (by default), and > schedules tasks dynamically as threads become available. The only Spark > parameter I change is the number of partitions in my RDD. > > My question is, how is Spark deciding how many threads to spawn and when? > > -- > Regards, > > Connor Zanin > Computer Science > University of Delaware > > > > -- > Regards, > > Connor Zanin > Computer Science > University of Delaware > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Number of Partitions Recommendations
You should also take into account amount of memory that you plan to use. It's advised not to give too much memory for each executor .. otherwise GC overhead will go up. Btw, why prime numbers? -- Ruslan Dautkhanov On Wed, Jul 29, 2015 at 3:31 AM, ponkin wrote: > Hi Rahul, > > Where did you see such a recommendation? > I personally define partitions with the following formula > > partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores ) > ) > > where > nextPrimeNumberAbove(x) - prime number which is greater than x > K - multiplicator to calculate start with 1 and encrease untill join > perfomance start to degrade > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: No event logs in yarn-cluster mode
On Sat, Aug 1, 2015 at 9:25 AM, Akmal Abbasov wrote: > When I running locally(./run-example SparkPi), the event logs are being > created, and I can start history server. > But when I am trying > ./spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py > Did you look for the event log on the machine where the Spark driver ran? You're using a "file:" URL and on yarn-cluster, that is in some random machine in the cluster, not your local machine launching the job. Which is why you should probably write these logs to HDFS.
Re: No event logs in yarn-cluster mode
Hi Akmal, It might be on HDFS, since you provided a relative path /opt/spark/spark-events to `spark.eventLog.dir`. -Andrew 2015-08-01 9:25 GMT-07:00 Akmal Abbasov : > Hi, I am trying to configure a history server for application. > When I running locally(./run-example SparkPi), the event logs are being > created, and I can start history server. > But when I am trying > ./spark-submit --class org.apache.spark.examples.SparkPi --master > yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py > I am getting > 15/08/01 18:18:50 INFO yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 192.168.56.192 > ApplicationMaster RPC port: 0 > queue: default > start time: 1438445890676 > final status: SUCCEEDED > tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A > user: hadoop > 15/08/01 18:18:50 INFO util.Utils: Shutdown hook called > 15/08/01 18:18:50 INFO util.Utils: Deleting directory > /tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74 > So it is succeeded, but there is no event logs for this application. > > here are my configs > spark-defaults.conf > spark.master yarn-cluster > spark.eventLog.dir /opt/spark/spark-events > spark.eventLog.enabled true > > spark-env.sh > export HADOOP_CONF_DIR="/opt/hadoop/etc/hadoop" > export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER > -Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181” > export > SPARK_HISTORY_OPTS="-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider > -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events > -Dspark.history.fs.cleaner.enabled=true" > > Any ideas? > > Thank you >
Re: Does anyone have experience with using Hadoop InputFormats?
Sent from my iPad On 2014-9-24, at 上午8:13, Steve Lewis wrote: > When I experimented with using an InputFormat I had used in Hadoop for a > long time in Hadoop I found > 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated > class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat > 2) initialize needs to be called in the constructor > 3) The type - mine was extends FileInputFormat must not be a > Hadoop Writable - those are not serializable but extends > FileInputFormat does work - I don't think this is > allowed in Hadoop > > Are these statements correct and if so it seems like most Hadoop InputFormate > - certainly the custom ones I create require serious modifications to work - > does anyone have samples of use of Hadoop InputFormat > > Since I am working with problems where a directory with multiple files are > processed and some files are many gigabytes in size with multiline complex > records an input format is a requirement. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
No event logs in yarn-cluster mode
Hi, I am trying to configure a history server for application. When I running locally(./run-example SparkPi), the event logs are being created, and I can start history server. But when I am trying ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py I am getting 15/08/01 18:18:50 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 192.168.56.192 ApplicationMaster RPC port: 0 queue: default start time: 1438445890676 final status: SUCCEEDED tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A user: hadoop 15/08/01 18:18:50 INFO util.Utils: Shutdown hook called 15/08/01 18:18:50 INFO util.Utils: Deleting directory /tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74 So it is succeeded, but there is no event logs for this application. here are my configs spark-defaults.conf spark.masteryarn-cluster spark.eventLog.dir /opt/spark/spark-events spark.eventLog.enabled true spark-env.sh export HADOOP_CONF_DIR="/opt/hadoop/etc/hadoop" export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181” export SPARK_HISTORY_OPTS="-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events -Dspark.history.fs.cleaner.enabled=true" Any ideas? Thank you
About memory leak in spark 1.4.1
Hi, all I upgrage spark to 1.4.1, many applications failed... I find the heap memory is not full , but the process of CoarseGrainedExecutorBackend will take more memory than I expect, and it will increase as time goes on, finally more than max limited of the server, the worker will die. Any can help?? Mode??standalone spark.executor.memory 50g 25583 xiaoju20 0 75.5g 55g 28m S 1729.3 88.1 2172:52 java 55g more than 50g I apply
Re: Spark SQL DataFrame: Nullable column and filtering
Dear all, after some fiddling I have arrived at this solution: /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val joinedDF = leftDF.as('left).join(rightDF.as('right), leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter") import joinedDF.sqlContext.implicits._ val leftColumns = leftDF.columns .map((cn: String) => $"left.$cn") val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)) .map((cn: String) => $"right.$cn") joinedDF.select( leftColumns ++ rightColumns: _*) } Comments welcome Alternatives I tried: - Not Working: If at least the right alias for rightDF is present, one could try joinedDF.drop("right." + columnname) but his does not work (no column is dropped). Unfortunately, drop does not support arguments of type Column / ColumnNames. *@Michael: Should I create a feature request in Jira for drop supporting Columns?* - Working: Without using aliases via as(...), but using column renaming instead: rightDF.withColumnRenamed( communColumnName, "right_" + commoncolumnName) to rename the right dataframe column and then do the join criterion as leftDF(commonColumnName) === rightDF("right_" + commonColumnName) In my opinion not so neat. Opinions? Things I observed: - Column handling does not seem consistent - select() supports alias, while drop( ... ) only supports strings. - DataFrame.apply( ) and DataFrame.col do also not support alias. - Thus the only way to handly ambiguous columnNames is via select at the moment. Can someone please confirm this! - Alias information is not displayed via DataFrame.printSchema. (or at least I did not find a way of how to) Cheers, Martin 2015-07-31 22:51 GMT+02:00 Martin Senne : > Dear Michael, dear all, > > a minimal example is listed below. > > After some further analysis I could figure out, that the problem is > related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use > columns of the left and right dataframes when doing the select on the > joined table. > > /** >* Customized left outer join on common column. >*/ > def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: > DataFrame, commonColumnName: String): DataFrame = { > > val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) > val rightColumns = rightDF.columns.filterNot(cn => > cn.equals(commonColumnName)).map(cn => rightDF(cn)) > > leftDF.join(rightDF, leftDF(commonColumnName) === > rightDF(commonColumnName), "leftouter") > .select(leftColumns ++ rightColumns: _*) > } > > As the column "y" of the right table has nullable=false, this is then also > transferred to the joined-Table y-Column, as I use rightDF( "y" ). > > Thus, I need to use columns of the joined table for the select. > > *Question now: The joined table has column names "x", "a", "x", "y". How do I > discard the second x column?* > > All my approaches failed (assuming here, that joinedDF is the joined > DataFrame. > > >- Using joinedDFdrop( "x" ) discards both "x" columns. >- Using joinedDF("x") does not work as it is ambigious >- Also using rightDF.as( "aliasname") in order to differentiate the >column "x" (from left DataFrame) with "x" (from right DataFrame) did not >work out, as I found no way as use select( $"aliasname.x") really >programmatically. Could someone sketch the code? > > Any help welcome, thanks > > > Martin > > > > > import org.apache.spark.sql.types.{StructField, StructType} > import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.sql.{DataFrame, SQLContext} > > object OtherEntities { > > case class Record( x:Int, a: String) > case class Mapping( x: Int, y: Int ) > > val records = Seq( Record(1, "hello"), Record(2, "bob")) > val mappings = Seq( Mapping(2, 5) ) > } > > object MinimalShowcase { > > /** >* Customized left outer join on common column. >*/ > def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: > DataFrame, commonColumnName: String): DataFrame = { > > val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) > val rightColumns = rightDF.columns.filterNot(cn => > cn.equals(commonColumnName)).map(cn => rightDF(cn)) > > leftDF.join(rightDF, leftDF(commonColumnName) === > rightDF(commonColumnName), "leftouter") > .select(leftColumns ++ rightColumns: _*) > } > > > /** >* Set, if a column is nullable. >* @param df source DataFrame >* @param cn is the column name to change >* @param nullable is the flag to set, such that the column is either > nullable or not >*/ > def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) > : DataFrame = { > > val schema = df.sche
Re: flatMap output on disk / flatMap memory overhead
Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format "MEMORY_AND_DISK_SER" or "MEMORY_ONLY_SER" ?? i.e. changing your : "rdd.persist(MEMORY_AND_DISK)" to "rdd.persist(MEMORY_ONLY_SER)" Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid wrote: > I agree with Richard. It looks like the issue here is shuffling, and > shuffle data is always written to disk, so the issue is definitely not that > all the output of flatMap has to be stored in memory. > > If at all possible, I'd first suggest upgrading to a new version of spark > -- even in 1.2, there were big improvements to shuffle with sort based > shuffle as the default. > > On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher > wrote: > >> Are you sure it's memory related? What is the disk utilization and IO >> performance on the workers? The error you posted looks to be related to >> shuffle trying to obtain block data from another worker node and failing to >> do so in reasonable amount of time. It may still be memory related, but I'm >> not sure that other resources are ruled out yet. >> >> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea < >> octavian.ga...@inf.ethz.ch> wrote: >> >>> I was tried using reduceByKey, without success. >>> >>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey >>> . >>> However, I got the same error as before, namely the error described here: >>> >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html >>> >>> My task is to count the frequencies of pairs of words that occur in a >>> set of >>> documents at least 5 times. I know that this final output is sparse and >>> should comfortably fit in memory. However, the intermediate pairs that >>> are >>> spilled by flatMap might need to be stored on the disk, but I don't >>> understand why the persist option does not work and my job fails. >>> >>> My code: >>> >>> rdd.persist(StorageLevel.MEMORY_AND_DISK) >>> .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type >>> ((word1,word2) , 1) >>> .reduceByKey((a,b) => (a + b).toShort) >>> .filter({case((x,y),count) => count >= 5}) >>> >>> >>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. >>> One >>> node I keep for the master, 7 nodes for the workers. >>> >>> my conf: >>> >>> conf.set("spark.cores.max", "128") >>> conf.set("spark.akka.frameSize", "1024") >>> conf.set("spark.executor.memory", "115g") >>> conf.set("spark.shuffle.file.buffer.kb", "1000") >>> >>> my spark-env.sh: >>> ulimit -n 20 >>> SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit >>> -XX:-UseCompressedOops" >>> SPARK_DRIVER_MEMORY=129G >>> >>> spark version: 1.1.1 >>> >>> Thank you a lot for your help! >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >