Re: Spark standalone - reading kerberos hdfs

2021-01-21 Thread Sudhir Babu Pothineni
Any other insights into this issue? I tried multiple way to supply keytab to 
executor 

Does spark standalone doesn’t support Kerberos?

> On Jan 8, 2021, at 1:53 PM, Sudhir Babu Pothineni  
> wrote:
> 
> 
> Incase of Spark on Yarn, Application Master shares the token. 
> 
> I think incase of spark stand alone the token is not shared to executor, any 
> example how to get the HDFS token for executor?
> 
>> On Fri, Jan 8, 2021 at 12:13 PM Gabor Somogyi  
>> wrote:
>> TGT is not enough, you need HDFS token which can be obtained by Spark. 
>> Please check the logs...
>> 
>>> On Fri, 8 Jan 2021, 18:51 Sudhir Babu Pothineni,  
>>> wrote:
>>> I spin up a spark standalone cluster (spark.autheticate=false), submitted a 
>>> job which reads remote kerberized HDFS, 
>>> 
>>> val spark = SparkSession.builder()
>>>   .master("spark://spark-standalone:7077")
>>>   .getOrCreate()
>>> 
>>> UserGroupInformation.loginUserFromKeytab(principal, keytab)
>>> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>>> 
>>> Ran into following exception:
>>> 
>>> Caused by:
>>> java.io.IOException: java.io.IOException: Failed on local exception: 
>>> java.io.IOException: org.apache.hadoop.security.AccessControlException: 
>>> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host 
>>> is: "..."; destination host is: "...":10346; 
>>> 
>>> 
>>> Any suggestions?
>>> 
>>> Thanks
>>> Sudhir


Re: Spark standalone - reading kerberos hdfs

2021-01-08 Thread Sudhir Babu Pothineni
Incase of Spark on Yarn, Application Master shares the token.

I think incase of spark stand alone the token is not shared to executor,
any example how to get the HDFS token for executor?

On Fri, Jan 8, 2021 at 12:13 PM Gabor Somogyi 
wrote:

> TGT is not enough, you need HDFS token which can be obtained by Spark.
> Please check the logs...
>
> On Fri, 8 Jan 2021, 18:51 Sudhir Babu Pothineni, 
> wrote:
>
>> I spin up a spark standalone cluster (spark.autheticate=false), submitted
>> a job which reads remote kerberized HDFS,
>>
>> val spark = SparkSession.builder()
>>   .master("spark://spark-standalone:7077")
>>   .getOrCreate()
>>
>> UserGroupInformation.loginUserFromKeytab(principal, keytab)
>> val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")
>>
>> Ran into following exception:
>>
>> Caused by:
>> java.io.IOException: java.io.IOException: Failed on local exception:
>> java.io.IOException: org.apache.hadoop.security.AccessControlException:
>> Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
>> is: "..."; destination host is: "...":10346;
>>
>>
>> Any suggestions?
>>
>> Thanks
>> Sudhir
>>
>


Spark standalone - reading kerberos hdfs

2021-01-08 Thread Sudhir Babu Pothineni
I spin up a spark standalone cluster (spark.autheticate=false), submitted a
job which reads remote kerberized HDFS,

val spark = SparkSession.builder()
  .master("spark://spark-standalone:7077")
  .getOrCreate()

UserGroupInformation.loginUserFromKeytab(principal, keytab)
val df = spark.read.parquet("hdfs://namenode:8020/test/parquet/")

Ran into following exception:

Caused by:
java.io.IOException: java.io.IOException: Failed on local exception:
java.io.IOException: org.apache.hadoop.security.AccessControlException:
Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host
is: "..."; destination host is: "...":10346;


Any suggestions?

Thanks
Sudhir


Re: spark-sklearn

2019-04-08 Thread Sudhir Babu Pothineni
Thanks Stephen, saw that, but this is already released version of 
spark-sklearn-0.3.0, tests should be working.

So just checking if I am doing anything wrong, version of other libraries
etc..

Thanks
Sudhir

> On Apr 8, 2019, at 1:52 PM, Stephen Boesch  wrote:
> 
> There are several suggestions on this SOF   
> https://stackoverflow.com/questions/38984775/spark-errorexpected-zero-arguments-for-construction-of-classdict-for-numpy-cor
> 
> 1
> 
> You need to convert the final value to a python list. You implement the 
> function as follows:
> 
> def uniq_array(col_array):
> x = np.unique(col_array)
> return list(x)
> This is because Spark doesn't understand the numpy array format. In order to 
> feed a python object that Spark DataFrames understand as an ArrayType, you 
> need to convert the output to a python list before returning it.
> 
> 
> 
> 
> 
> 
> 
> The source of the problem is that object returned from the UDF doesn't 
> conform to the declared type. np.unique not only returns numpy.ndarray but 
> also converts numerics to the corresponding NumPy types which are not 
> compatible with DataFrame API. You can try something like this:
> 
> udf(lambda x: list(set(x)), ArrayType(IntegerType()))
> or this (to keep order)
> 
> udf(lambda xs: list(OrderedDict((x, None) for x in xs)), 
> ArrayType(IntegerType()))
> instead.
> 
> If you really want np.unique you have to convert the output:
> 
> udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> Am Mo., 8. Apr. 2019 um 11:43 Uhr schrieb Sudhir Babu Pothineni 
>> :
>> 
>> 
>> 
>>> Trying to run tests in spark-sklearn, anybody check the below exception
>>> 
>>> pip freeze:
>>> 
>>> nose==1.3.7
>>> numpy==1.16.1
>>> pandas==0.19.2
>>> python-dateutil==2.7.5
>>> pytz==2018.9
>>> scikit-learn==0.19.2
>>> scipy==1.2.0
>>> six==1.12.0
>>> spark-sklearn==0.3.0
>>> 
>>> Spark version:
>>> spark-2.2.3-bin-hadoop2.6/bin/pyspark
>>> 
>>> 
>>> running into following exception:
>>> 
>>> ==
>>> ERROR: test_scipy_sparse (spark_sklearn.converter_test.CSRVectorUDTTests)
>>> --
>>> Traceback (most recent call last):
>>>   File 
>>> "/home/spothineni/Downloads/spark-sklearn-release-0.3.0/python/spark_sklearn/converter_test.py",
>>>  line 83, in test_scipy_sparse
>>> self.assertEqual(df.count(), 1)
>>>   File 
>>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
>>>  line 522, in count
>>> return int(self._jdf.count())
>>>   File 
>>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>>>  line 1257, in __call__
>>> answer, self.gateway_client, self.target_id, self.name)
>>>   File 
>>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/utils.py",
>>>  line 63, in deco
>>> return f(*a, **kw)
>>>   File 
>>> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>>>  line 328, in get_return_value
>>> format(target_id, ".", name), value)
>>> Py4JJavaError: An error occurred while calling o652.count.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>>> 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in 
>>> stage 0.0 (TID 11, localhost, executor driver): 
>>> net.razorvine.pickle.PickleException: expected zero arguments for 
>>> construction of ClassDict (for numpy.dtype)
>>> at 
>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>> at 
>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
>>> at 
>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(

Re: spark-sklearn

2019-04-08 Thread Sudhir Babu Pothineni


> 
> Trying to run tests in spark-sklearn, anybody check the below exception
> 
> pip freeze:
> 
> nose==1.3.7
> numpy==1.16.1
> pandas==0.19.2
> python-dateutil==2.7.5
> pytz==2018.9
> scikit-learn==0.19.2
> scipy==1.2.0
> six==1.12.0
> spark-sklearn==0.3.0
> 
> Spark version:
> spark-2.2.3-bin-hadoop2.6/bin/pyspark
> 
> 
> running into following exception:
> 
> ==
> ERROR: test_scipy_sparse (spark_sklearn.converter_test.CSRVectorUDTTests)
> --
> Traceback (most recent call last):
>   File 
> "/home/spothineni/Downloads/spark-sklearn-release-0.3.0/python/spark_sklearn/converter_test.py",
>  line 83, in test_scipy_sparse
> self.assertEqual(df.count(), 1)
>   File 
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
>  line 522, in count
> return int(self._jdf.count())
>   File 
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 1257, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File 
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/utils.py",
>  line 63, in deco
> return f(*a, **kw)
>   File 
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o652.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 
> in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 
> (TID 11, localhost, executor driver): net.razorvine.pickle.PickleException: 
> expected zero arguments for construction of ClassDict (for numpy.dtype)
>   at 
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>   at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>   at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>   at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>   at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>   at 
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
>   at 
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>   at org.apache.spark.scheduler.Task.run(Task.scala:121)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
>   at 
> org.apache.spark.sche

ORC file stripe statistics in Spark

2016-09-27 Thread Sudhir Babu Pothineni
I am trying to get number of rows each stripe of ORC file?

hivecontext.orcFile doesn't exist anymore? I am using Spark 1.6.0

scala> val hiveSqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveSqlContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@83223af

scala> hiveSqlContext.
analyze   applySchema
asInstanceOf  baseRelationToDataFrame   cacheTable
clearCachecreateDataFrame
createDataset createExternalTable   dropTempTable
emptyDataFrameexperimental
getAllConfs   getConf   implicits
isCached  isInstanceOf
isRootContext jdbc  jsonFile
jsonRDD   listenerManager
load  newSessionparquetFile
range read
refreshTable  setConf   sparkContext
sql   table
tableNamestablestoString
udf   uncacheTable


Thanks
Sudhir


Re: Zero Data Loss in Spark with Kafka

2016-08-23 Thread Sudhir Babu Pothineni
saving offsets to zookeeper is old approach, check-pointing internally
saves the offsets to HDFS/location of checkpointing.

more details here:
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Tue, Aug 23, 2016 at 10:30 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi Experts,
>
> I am looking for some information on how to acheive zero data loss while
> working with kafka and Spark. I have searched online and blogs have
> different answer. Please let me know if anyone has idea on this.
>
> Blog 1:
> https://databricks.com/blog/2015/01/15/improved-driver-
> fault-tolerance-and-zero-data-loss-in-spark-streaming.html
>
>
> Blog2:
> http://aseigneurin.github.io/2016/05/07/spark-kafka-
> achieving-zero-data-loss.html
>
>
> Blog one simply says configuration change with checkpoint directory and
> blog 2 give details about on how to save offsets to zoo keeper. can you
> please help me out with right approach.
>
> Thanks,
> Asmath
>
>
>


Re: ORC v/s Parquet for Spark 2.0

2016-07-27 Thread Sudhir Babu Pothineni
It depends on what you are dong, here is the recent comparison of ORC, Parquet

https://www.slideshare.net/mobile/oom65/file-format-benchmarks-avro-json-orc-parquet

Although from ORC authors, I thought fair comparison, We use ORC as System of 
Record on our Cloudera HDFS cluster, our experience is so far good.

Perquet is backed by Cloudera, which has more installations of Hadoop. ORC is 
by Hortonworks, so battle of file format continues...

Sent from my iPhone

> On Jul 27, 2016, at 4:54 PM, janardhan shetty  wrote:
> 
> Seems like parquet format is better comparatively to orc when the dataset is 
> log data without nested structures? Is this fair understanding ?
> 
>> On Jul 27, 2016 1:30 PM, "Jörn Franke"  wrote:
>> Kudu has been from my impression be designed to offer somethings between 
>> hbase and parquet for write intensive loads - it is not faster for warehouse 
>> type of querying compared to parquet (merely slower, because that is not its 
>> use case).   I assume this is still the strategy of it.
>> 
>> For some scenarios it could make sense together with parquet and Orc. 
>> However I am not sure what the advantage towards using hbase + parquet and 
>> Orc.
>> 
>>> On 27 Jul 2016, at 11:47, "u...@moosheimer.com"  wrote:
>>> 
>>> Hi Gourav,
>>> 
>>> Kudu (if you mean Apache Kuda, the Cloudera originated project) is a in 
>>> memory db with data storage while Parquet is "only" a columnar storage 
>>> format.
>>> 
>>> As I understand, Kudu is a BI db to compete with Exasol or Hana (ok ... 
>>> that's more a wish :-).
>>> 
>>> Regards,
>>> Uwe
>>> 
>>> Mit freundlichen Grüßen / best regards
>>> Kay-Uwe Moosheimer
>>> 
>>>> Am 27.07.2016 um 09:15 schrieb Gourav Sengupta :
>>>> 
>>>> Gosh,
>>>> 
>>>> whether ORC came from this or that, it runs queries in HIVE with TEZ at a 
>>>> speed that is better than SPARK.
>>>> 
>>>> Has anyone heard of KUDA? Its better than Parquet. But I think that 
>>>> someone might just start saying that KUDA has difficult lineage as well. 
>>>> After all dynastic rules dictate.
>>>> 
>>>> Personally I feel that if something stores my data compressed and makes me 
>>>> access it faster I do not care where it comes from or how difficult the 
>>>> child birth was :)
>>>> 
>>>> 
>>>> Regards,
>>>> Gourav
>>>> 
>>>>> On Tue, Jul 26, 2016 at 11:19 PM, Sudhir Babu Pothineni 
>>>>>  wrote:
>>>>> Just correction:
>>>>> 
>>>>> ORC Java libraries from Hive are forked into Apache ORC. Vectorization 
>>>>> default. 
>>>>> 
>>>>> Do not know If Spark leveraging this new repo?
>>>>> 
>>>>> 
>>>>>  org.apache.orc
>>>>> orc
>>>>> 1.1.2
>>>>> pom
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Sent from my iPhone
>>>>>> On Jul 26, 2016, at 4:50 PM, Koert Kuipers  wrote:
>>>>>> 
>>>>> 
>>>>>> parquet was inspired by dremel but written from the ground up as a 
>>>>>> library with support for a variety of big data systems (hive, pig, 
>>>>>> impala, cascading, etc.). it is also easy to add new support, since its 
>>>>>> a proper library.
>>>>>> 
>>>>>> orc bas been enhanced while deployed at facebook in hive and at yahoo in 
>>>>>> hive. just hive. it didn't really exist by itself. it was part of the 
>>>>>> big java soup that is called hive, without an easy way to extract it. 
>>>>>> hive does not expose proper java apis. it never cared for that.
>>>>>> 
>>>>>>> On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU 
>>>>>>>  wrote:
>>>>>>> Interesting opinion, thank you
>>>>>>> 
>>>>>>> Still, on the website parquet is basically inspired by Dremel (Google) 
>>>>>>> [1] and part of orc has been enhanced while deployed for Facebook, 
>>>>>>> Yahoo [2].
>>>>>>> 
>>>>>>> Other than this presentation [3], d

Re: ORC v/s Parquet for Spark 2.0

2016-07-26 Thread Sudhir Babu Pothineni
Just correction:

ORC Java libraries from Hive are forked into Apache ORC. Vectorization default. 

Do not know If Spark leveraging this new repo?


 org.apache.orc
orc
1.1.2
pom









Sent from my iPhone
> On Jul 26, 2016, at 4:50 PM, Koert Kuipers  wrote:
> 
> parquet was inspired by dremel but written from the ground up as a library 
> with support for a variety of big data systems (hive, pig, impala, cascading, 
> etc.). it is also easy to add new support, since its a proper library.
> 
> orc bas been enhanced while deployed at facebook in hive and at yahoo in 
> hive. just hive. it didn't really exist by itself. it was part of the big 
> java soup that is called hive, without an easy way to extract it. hive does 
> not expose proper java apis. it never cared for that.
> 
>> On Tue, Jul 26, 2016 at 9:57 AM, Ovidiu-Cristian MARCU 
>>  wrote:
>> Interesting opinion, thank you
>> 
>> Still, on the website parquet is basically inspired by Dremel (Google) [1] 
>> and part of orc has been enhanced while deployed for Facebook, Yahoo [2].
>> 
>> Other than this presentation [3], do you guys know any other benchmark?
>> 
>> [1]https://parquet.apache.org/documentation/latest/
>> [2]https://orc.apache.org/docs/
>> [3] 
>> http://www.slideshare.net/oom65/file-format-benchmarks-avro-json-orc-parquet
>> 
>>> On 26 Jul 2016, at 15:19, Koert Kuipers  wrote:
>>> 
>>> when parquet came out it was developed by a community of companies, and was 
>>> designed as a library to be supported by multiple big data projects. nice
>>> 
>>> orc on the other hand initially only supported hive. it wasn't even 
>>> designed as a library that can be re-used. even today it brings in the 
>>> kitchen sink of transitive dependencies. yikes
>>> 
>>> 
 On Jul 26, 2016 5:09 AM, "Jörn Franke"  wrote:
 I think both are very similar, but with slightly different goals. While 
 they work transparently for each Hadoop application you need to enable 
 specific support in the application for predicate push down. 
 In the end you have to check which application you are using and do some 
 tests (with correct predicate push down configuration). Keep in mind that 
 both formats work best if they are sorted on filter columns (which is your 
 responsibility) and if their optimatizations are correctly configured (min 
 max index, bloom filter, compression etc) . 
 
 If you need to ingest sensor data you may want to store it first in hbase 
 and then batch process it in large files in Orc or parquet format.
 
> On 26 Jul 2016, at 04:09, janardhan shetty  wrote:
> 
> Just wondering advantages and disadvantages to convert data into ORC or 
> Parquet. 
> 
> In the documentation of Spark there are numerous examples of Parquet 
> format. 
> 
> Any strong reasons to chose Parquet over ORC file format ?
> 
> Also : current data compression is bzip2
> 
> http://stackoverflow.com/questions/32373460/parquet-vs-orc-vs-orc-with-snappy
>  
> This seems like biased.
> 


Re: Limit pyspark.daemon threads

2016-06-15 Thread Sudhir Babu Pothineni
Hi Ken, It may be also related to Grid Engine job scheduling? If it is 16 core 
(virtual cores?), grid engine allocates 16 slots, If you use 'max' scheduling, 
it will send 16 processes sequentially to same machine, on the top of it each 
spark job has its own executors. Limit the number of jobs scheduled to the 
machine = number of physical cores of single CPU, it will solve the problem if 
it is related to GE. If you are sure it's related to Spark, please ignore.

-Sudhir


Sent from my iPhone

> On Jun 15, 2016, at 8:53 AM, Gene Pang  wrote:
> 
> As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and 
> you can then share that RDD across different jobs. If you would like to run 
> Spark on Alluxio, this documentation can help: 
> http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html
> 
> Thanks,
> Gene
> 
>> On Tue, Jun 14, 2016 at 12:44 AM, agateaaa  wrote:
>> Hi,
>> 
>> I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set 
>> spark.executor.cores to 1, but I see that whenever streaming batch starts 
>> processing data, see python -m pyspark.daemon processes increase gradually 
>> to about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon 
>> takes up around 100 % CPU) 
>> 
>> After the processing is done 4 pyspark.daemon processes go away and we are 
>> left with one till the next batch run. Also sometimes the  CPU usage for 
>> executor process spikes to about 800% even though spark.executor.core is set 
>> to 1
>> 
>> e.g. top output
>> PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
>> 19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33 
>> /usr/lib/j+ <--EXECUTOR
>> 
>> 13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17 python 
>> -m + <--pyspark.daemon
>> 13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python 
>> -m + <--pyspark.daemon
>> 14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python 
>> -m + <--pyspark.daemon
>> 14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python 
>> -m + <--pyspark.daemon
>> 14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python 
>> -m + <--pyspark.daemon
>> 
>> 
>> 
>> Is there any way to control the number of pyspark.daemon processes that get 
>> spawned ?
>> 
>> Thank you
>> Agateaaa
>> 
>>> On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser  wrote:
>>> Hey Ken,
>>> 
>>> 1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap 
>>> storage option using Alluxio, formerly Tachyon, with which I have no 
>>> experience however.)
>>> 
>>> 2. The worker memory setting is not a hard maximum unfortunately. What 
>>> happens is that during aggregation the Python daemon will check its process 
>>> size. If the size is larger than this setting, it will start spilling to 
>>> disk. I've seen many occasions where my daemons grew larger. Also, you're 
>>> relying on Python's memory management to free up space again once objects 
>>> are evicted. In practice, leave this setting reasonably small but make sure 
>>> there's enough free memory on the machine so you don't run into OOM 
>>> conditions. If the lower memory setting causes strains for your users, make 
>>> sure they increase the parallelism of their jobs (smaller partitions 
>>> meaning less data is processed at a time).
>>> 
>>> 3. I believe that is the behavior you can expect when setting 
>>> spark.executor.cores. I've not experimented much with it and haven't looked 
>>> at that part of the code, but what you describe also reflects my 
>>> understanding. Please share your findings here, I'm sure those will be very 
>>> helpful to others, too.
>>> 
>>> One more suggestion for your users is to move to the Pyspark DataFrame API. 
>>> Much of the processing will then happen in the JVM, and you will bump into 
>>> fewer Python resource contention issues.
>>> 
>>> Best,
>>> -Sven
>>> 
>>> 
 On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken  
 wrote:
 This is extremely helpful!
 
 I’ll have to talk to my users about how the python memory limit should be 
 adjusted and what their expectations are. I’m fairly certain we bumped it 
 up in the dark past when jobs were failing because of insufficient memory 
 for the python processes. 
 
 So just to make sure I’m understanding correctly: 
 
 JVM memory (set by SPARK_EXECUTOR_MEMORY and/or SPARK_WORKER_MEMORY?) is 
 where the RDDs are stored. Currently both of those values are set to 90GB
 spark.python.worker.memory controls how much RAM each python task can take 
 maximum (roughly speaking. Currently set to 4GB
 spark.task.cpus controls how many java worker threads will exist and thus 
 indirectly how many pyspark daemon processes will exist
 
 I’m also looking into fixing my cron jobs so they don’t stack up by 
 implementing flock in the jobs and changing how teardowns