Re: Spark Number of Partitions Recommendations

2015-08-01 Thread Понькин Алексей
Yes, I forgot to mention
I chose prime number as a modulo for hash function because my keys are usually 
strings and spark calculates particular partitiion using key hash(see 
HashPartitioner.scala) So, to avoid big number of collisions(when many keys 
located in few partition) it is common to use prime number in modulo. But it 
makes sense only for String keys offcourse, because of hash function. If yuo 
have different hash function for key of different type you can use any other 
modulo instead prime number.
I like this discussion on this topic 
http://stackoverflow.com/questions/1145217/why-should-hash-functions-use-a-prime-number-modulus


-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1&t=1


02.08.2015, 00:14, "Ruslan Dautkhanov" :
> You should also take into account amount of memory that you plan to use.
> It's advised not to give too much memory for each executor .. otherwise GC 
> overhead will go up.
>
> Btw, why prime numbers?
>
> --
> Ruslan Dautkhanov
>
> On Wed, Jul 29, 2015 at 3:31 AM, ponkin  wrote:
>> Hi Rahul,
>>
>> Where did you see such a recommendation?
>> I personally define partitions with the following formula
>>
>> partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores ) )
>>
>> where
>> nextPrimeNumberAbove(x) - prime number which is greater than x
>> K - multiplicator  to calculate start with 1 and encrease untill join
>> perfomance start to degrade
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html
>>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TCP/IP speedup

2015-08-01 Thread Ruslan Dautkhanov
If your network is bandwidth-bound, you'll see setting jumbo frames (MTU
9000)
may increase bandwidth up to ~20%.

http://docs.hortonworks.com/HDP2Alpha/index.htm#Hardware_Recommendations_for_Hadoop.htm
"Enabling Jumbo Frames across the cluster improves bandwidth"

If Spark workload is not network bandwidth-bound, I can see it'll be a few
percent to no improvement.



-- 
Ruslan Dautkhanov

On Sat, Aug 1, 2015 at 6:08 PM, Simon Edelhaus  wrote:

> H
>
> 2% huh.
>
>
> -- ttfn
> Simon Edelhaus
> California 2015
>
> On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra 
> wrote:
>
>> https://spark-summit.org/2015/events/making-sense-of-spark-performance/
>>
>> On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus  wrote:
>>
>>> Hi All!
>>>
>>> How important would be a significant performance improvement to TCP/IP
>>> itself, in terms of
>>> overall job performance improvement. Which part would be most
>>> significantly accelerated?
>>> Would it be HDFS?
>>>
>>> -- ttfn
>>> Simon Edelhaus
>>> California 2015
>>>
>>
>>
>


Re: TCP/IP speedup

2015-08-01 Thread Simon Edelhaus
H

2% huh.


-- ttfn
Simon Edelhaus
California 2015

On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra 
wrote:

> https://spark-summit.org/2015/events/making-sense-of-spark-performance/
>
> On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus  wrote:
>
>> Hi All!
>>
>> How important would be a significant performance improvement to TCP/IP
>> itself, in terms of
>> overall job performance improvement. Which part would be most
>> significantly accelerated?
>> Would it be HDFS?
>>
>> -- ttfn
>> Simon Edelhaus
>> California 2015
>>
>
>


Re: TCP/IP speedup

2015-08-01 Thread Mark Hamstra
https://spark-summit.org/2015/events/making-sense-of-spark-performance/

On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus  wrote:

> Hi All!
>
> How important would be a significant performance improvement to TCP/IP
> itself, in terms of
> overall job performance improvement. Which part would be most
> significantly accelerated?
> Would it be HDFS?
>
> -- ttfn
> Simon Edelhaus
> California 2015
>


TCP/IP speedup

2015-08-01 Thread Simon Edelhaus
Hi All!

How important would be a significant performance improvement to TCP/IP
itself, in terms of
overall job performance improvement. Which part would be most significantly
accelerated?
Would it be HDFS?

-- ttfn
Simon Edelhaus
California 2015


Re: How does the # of tasks affect # of threads?

2015-08-01 Thread Connor Zanin
1. I believe that the default memory (per executor) is 512m (from the
documentation)
2. I have increased the memory used by spark on workers in my launch script
when submitting the job
   (--executor-memory 124g)
3. The job completes successfully, it is the "road bumps" in the middle I
am concerned with

I would like insight into how Spark handle thread creation

On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman 
wrote:

> Hello,
>
> I am not an expert with Spark, but the error thrown by spark seems
> indicate that not enough memory for launching job. By default, Spark
> allocated 1GB for memory, may be you should increase it ?
>
> Best regards
>
> Fabrice
>
> Le sam. 1 août 2015 à 22:51, Connor Zanin  a écrit :
>
>> Hello,
>>
>> I am having an issue when I run a word count job. I have included the
>> source and log files for reference. The job finishes successfully, but
>> about halfway through I get a java.lang.OutOfMemoryError (could not create
>> native thread), and this leads to the loss of the Executor. After some
>> searching I found out this was a problem with the environment and the limit
>> by the OS on how many threads I could spawn.
>>
>> However, I had thought that Spark only maintained a thread pool equal in
>> size to the number of cores available across the nodes (by default), and
>> schedules tasks dynamically as threads become available. The only Spark
>> parameter I change is the number of partitions in my RDD.
>>
>> My question is, how is Spark deciding how many threads to spawn and when?
>>
>> --
>> Regards,
>>
>> Connor Zanin
>> Computer Science
>> University of Delaware
>>
>>
>>
>> --
>> Regards,
>>
>> Connor Zanin
>> Computer Science
>> University of Delaware
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Regards,

Connor Zanin
Computer Science
University of Delaware


Re: How does the # of tasks affect # of threads?

2015-08-01 Thread Fabrice Sznajderman
Hello,

I am not an expert with Spark, but the error thrown by spark seems indicate
that not enough memory for launching job. By default, Spark allocated 1GB
for memory, may be you should increase it ?

Best regards

Fabrice

Le sam. 1 août 2015 à 22:51, Connor Zanin  a écrit :

> Hello,
>
> I am having an issue when I run a word count job. I have included the
> source and log files for reference. The job finishes successfully, but
> about halfway through I get a java.lang.OutOfMemoryError (could not create
> native thread), and this leads to the loss of the Executor. After some
> searching I found out this was a problem with the environment and the limit
> by the OS on how many threads I could spawn.
>
> However, I had thought that Spark only maintained a thread pool equal in
> size to the number of cores available across the nodes (by default), and
> schedules tasks dynamically as threads become available. The only Spark
> parameter I change is the number of partitions in my RDD.
>
> My question is, how is Spark deciding how many threads to spawn and when?
>
> --
> Regards,
>
> Connor Zanin
> Computer Science
> University of Delaware
>
>
>
> --
> Regards,
>
> Connor Zanin
> Computer Science
> University of Delaware
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org


Re: Spark Number of Partitions Recommendations

2015-08-01 Thread Ruslan Dautkhanov
You should also take into account amount of memory that you plan to use.
It's advised not to give too much memory for each executor .. otherwise GC
overhead will go up.

Btw, why prime numbers?



-- 
Ruslan Dautkhanov

On Wed, Jul 29, 2015 at 3:31 AM, ponkin  wrote:

> Hi Rahul,
>
> Where did you see such a recommendation?
> I personally define partitions with the following formula
>
> partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores )
> )
>
> where
> nextPrimeNumberAbove(x) - prime number which is greater than x
> K - multiplicator  to calculate start with 1 and encrease untill join
> perfomance start to degrade
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: No event logs in yarn-cluster mode

2015-08-01 Thread Marcelo Vanzin
On Sat, Aug 1, 2015 at 9:25 AM, Akmal Abbasov 
wrote:

> When I running locally(./run-example SparkPi), the event logs are being
> created, and I can start history server.
> But when I am trying
> ./spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py
>

Did you look for the event log on the machine where the Spark driver ran?
You're using a "file:" URL and on yarn-cluster, that is in some random
machine in the cluster, not your local machine launching the job.

Which is why you should probably write these logs to HDFS.


Re: No event logs in yarn-cluster mode

2015-08-01 Thread Andrew Or
Hi Akmal,

It might be on HDFS, since you provided a relative path
/opt/spark/spark-events to `spark.eventLog.dir`.

-Andrew

2015-08-01 9:25 GMT-07:00 Akmal Abbasov :

> Hi, I am trying to configure a history server for application.
> When I running locally(./run-example SparkPi), the event logs are being
> created, and I can start history server.
> But when I am trying
> ./spark-submit --class org.apache.spark.examples.SparkPi --master
> yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py
> I am getting
> 15/08/01 18:18:50 INFO yarn.Client:
> client token: N/A
> diagnostics: N/A
> ApplicationMaster host: 192.168.56.192
> ApplicationMaster RPC port: 0
> queue: default
> start time: 1438445890676
> final status: SUCCEEDED
> tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A
> user: hadoop
> 15/08/01 18:18:50 INFO util.Utils: Shutdown hook called
> 15/08/01 18:18:50 INFO util.Utils: Deleting directory
> /tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74
> So it is succeeded, but there is no event logs for this application.
>
> here are my configs
> spark-defaults.conf
> spark.master yarn-cluster
> spark.eventLog.dir   /opt/spark/spark-events
> spark.eventLog.enabled  true
>
> spark-env.sh
> export HADOOP_CONF_DIR="/opt/hadoop/etc/hadoop"
> export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
> -Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181”
> export
> SPARK_HISTORY_OPTS="-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
> -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events
> -Dspark.history.fs.cleaner.enabled=true"
>
> Any ideas?
>
> Thank you
>


Re: Does anyone have experience with using Hadoop InputFormats?

2015-08-01 Thread Antsy.Rao


Sent from my iPad

On 2014-9-24, at 上午8:13, Steve Lewis  wrote:

>  When I experimented with using an InputFormat I had used in Hadoop for a 
> long time in Hadoop I found
> 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated 
> class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat
> 2) initialize needs to be called in the constructor
> 3) The type - mine was extends FileInputFormat must not be a 
> Hadoop Writable - those are not serializable but extends 
> FileInputFormat does work - I don't think this is 
> allowed in Hadoop 
> 
> Are these statements correct and if so it seems like most Hadoop InputFormate 
> - certainly the custom ones I create require serious modifications to work - 
> does anyone have samples of use of Hadoop InputFormat 
> 
> Since I am working with problems where a directory with multiple files are 
> processed and some files are many gigabytes in size with multiline complex 
> records an input format is a requirement.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



No event logs in yarn-cluster mode

2015-08-01 Thread Akmal Abbasov
Hi, I am trying to configure a history server for application. 
When I running locally(./run-example SparkPi), the event logs are being 
created, and I can start history server.
But when I am trying
./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster 
file:///opt/hadoop/spark/examples/src/main/python/pi.py 

I am getting 
15/08/01 18:18:50 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: 192.168.56.192
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1438445890676
 final status: SUCCEEDED
 tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A
 user: hadoop
15/08/01 18:18:50 INFO util.Utils: Shutdown hook called
15/08/01 18:18:50 INFO util.Utils: Deleting directory 
/tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74
So it is succeeded, but there is no event logs for this application.

here are my configs
spark-defaults.conf
spark.masteryarn-cluster
spark.eventLog.dir  /opt/spark/spark-events
spark.eventLog.enabled  true

spark-env.sh
export HADOOP_CONF_DIR="/opt/hadoop/etc/hadoop"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181”
export 
SPARK_HISTORY_OPTS="-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
 -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events 
-Dspark.history.fs.cleaner.enabled=true"

Any ideas?

Thank you

About memory leak in spark 1.4.1

2015-08-01 Thread Sea
Hi, all
I upgrage spark to 1.4.1, many applications failed... I find the heap memory is 
not full , but the process of CoarseGrainedExecutorBackend will take more 
memory than I expect, and it will increase as time goes on, finally more than 
max limited of the server, the worker will die.


Any can help??


Mode??standalone


spark.executor.memory 50g


25583 xiaoju20   0 75.5g  55g  28m S 1729.3 88.1   2172:52 java


55g more than 50g I apply

Re: Spark SQL DataFrame: Nullable column and filtering

2015-08-01 Thread Martin Senne
Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter")

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
.map((cn: String) => $"left.$cn")
  val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName))
.map((cn: String) => $"right.$cn")

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop("right." + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, "right_" +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF("right_" + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
  - select() supports alias, while drop( ... ) only supports
  strings.
  - DataFrame.apply(  ) and DataFrame.col do also not support alias.
  - Thus the only way to handly ambiguous columnNames is via select at
  the moment. Can someone please confirm this!
  - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne :

> Dear Michael, dear all,
>
> a minimal example is listed below.
>
> After some further analysis I could figure out, that the problem is
> related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
> columns of the left and right dataframes when doing the select on the
> joined table.
>
>   /**
>* Customized left outer join on common column.
>*/
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
> DataFrame, commonColumnName: String): DataFrame = {
>
> val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
> val rightColumns = rightDF.columns.filterNot(cn => 
> cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
> leftDF.join(rightDF, leftDF(commonColumnName) === 
> rightDF(commonColumnName), "leftouter")
>   .select(leftColumns ++ rightColumns: _*)
>   }
>
> As the column "y" of the right table has nullable=false, this is then also 
> transferred to the joined-Table y-Column, as I use rightDF( "y" ).
>
> Thus, I need to use columns of the joined table for the select.
>
> *Question now: The joined table has column names "x", "a", "x", "y". How do I 
> discard the second x column?*
>
> All my approaches failed (assuming here, that joinedDF is the joined 
> DataFrame.
>
>
>- Using joinedDFdrop( "x" ) discards both "x" columns.
>- Using joinedDF("x") does not work as it is ambigious
>- Also using rightDF.as( "aliasname")  in order to differentiate the
>column "x" (from left DataFrame) with "x" (from right DataFrame) did not
>work out, as I found no way as use select( $"aliasname.x") really
>programmatically. Could someone sketch the code?
>
> Any help welcome, thanks
>
>
> Martin
>
>
>
> 
> import org.apache.spark.sql.types.{StructField, StructType}
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{DataFrame, SQLContext}
>
> object OtherEntities {
>
>   case class Record( x:Int, a: String)
>   case class Mapping( x: Int, y: Int )
>
>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
>   val mappings = Seq( Mapping(2, 5) )
> }
>
> object MinimalShowcase {
>
>   /**
>* Customized left outer join on common column.
>*/
>   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
> DataFrame, commonColumnName: String): DataFrame = {
>
> val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
> val rightColumns = rightDF.columns.filterNot(cn => 
> cn.equals(commonColumnName)).map(cn => rightDF(cn))
>
> leftDF.join(rightDF, leftDF(commonColumnName) === 
> rightDF(commonColumnName), "leftouter")
>   .select(leftColumns ++ rightColumns: _*)
>   }
>
>
>   /**
>* Set, if a column is nullable.
>* @param df source DataFrame
>* @param cn is the column name to change
>* @param nullable is the flag to set, such that the column is either 
> nullable or not
>*/
>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) 
> : DataFrame = {
>
> val schema = df.sche

Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
"MEMORY_AND_DISK_SER"  or "MEMORY_ONLY_SER" ??
i.e. changing your :
"rdd.persist(MEMORY_AND_DISK)"
to
"rdd.persist(MEMORY_ONLY_SER)"

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid  wrote:

> I agree with Richard.  It looks like the issue here is shuffling, and
> shuffle data is always written to disk, so the issue is definitely not that
> all the output of flatMap has to be stored in memory.
>
> If at all possible, I'd first suggest upgrading to a new version of spark
> -- even in 1.2, there were big improvements to shuffle with sort based
> shuffle as the default.
>
> On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher  > wrote:
>
>> Are you sure it's memory related? What is the disk utilization and IO
>> performance on the workers? The error you posted looks to be related to
>> shuffle trying to obtain block data from another worker node and failing to
>> do so in reasonable amount of time. It may still be memory related, but I'm
>> not sure that other resources are ruled out yet.
>>
>> On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea <
>> octavian.ga...@inf.ethz.ch> wrote:
>>
>>> I was tried using reduceByKey, without success.
>>>
>>> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
>>> .
>>> However, I got the same error as before, namely the error described here:
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>>>
>>> My task is to count the frequencies of pairs of words that occur in a
>>> set of
>>> documents at least 5 times. I know that this final output is sparse and
>>> should comfortably fit in memory. However, the intermediate pairs that
>>> are
>>> spilled by flatMap might need to be stored on the disk, but I don't
>>> understand why the persist option does not work and my job fails.
>>>
>>> My code:
>>>
>>> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>>>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
>>> ((word1,word2) , 1)
>>> .reduceByKey((a,b) => (a + b).toShort)
>>> .filter({case((x,y),count) => count >= 5})
>>>
>>>
>>> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
>>> One
>>> node I keep for the master, 7 nodes for the workers.
>>>
>>> my conf:
>>>
>>> conf.set("spark.cores.max", "128")
>>> conf.set("spark.akka.frameSize", "1024")
>>> conf.set("spark.executor.memory", "115g")
>>> conf.set("spark.shuffle.file.buffer.kb", "1000")
>>>
>>> my spark-env.sh:
>>>  ulimit -n 20
>>>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
>>> -XX:-UseCompressedOops"
>>>  SPARK_DRIVER_MEMORY=129G
>>>
>>> spark version: 1.1.1
>>>
>>> Thank you a lot for your help!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>