custom join using complex keys

2015-05-09 Thread Mathieu D
Hi folks,

I need to join RDDs having composite keys like this : (K1, K2 ... Kn).

The joining rule looks like this :
* if left.K1 == right.K1, then we have a true equality, and all K2... Kn
are also equal.
* if left.K1 != right.K1 but left.K2 == right.K2, I have a partial
equality, and I also want the join to occur there.
* if K2 don't match, then I test K3 and so on.

Is there a way to implement a custom join with a given predicate to
implement this ? (I would probably also need to provide a partitioner, and
some sorting predicate).

Left and right RDD are 1-10 millions lines long.
Any idea ?

Thanks
Mathieu


Spark streaming closes with Cassandra Conector

2015-05-09 Thread Sergio Jiménez Barrio
I am trying save some data in Cassandra in app with spark Streaming:

Messages.foreachRDD {
 . . .
CassandraRDD.saveToCassandra(test,test)
}

When I run, the app is closes when I recibe data or can't connect with
Cassandra.

Some idea? Thanks


-- 
Atte. Sergio Jiménez


Does NullWritable can not be used in Spark?

2015-05-09 Thread donhoff_h
Hi, experts.


I wrote a spark program to write a sequence file. I found if I used the 
NullWritable as the Key Class of the SequenceFile, the program reported 
exceptions. But if I used the BytesWritable or Text as the Key Class, the 
program did not report the exceptions.


Does spark not support NullWritable class?  The spark version I use is 1.3.0 
and the exceptions are as following:


ERROR yarn.ApplicationMaster: User class threw exception: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
  java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at dhao.test.SeqFile.TestWriteSeqFile02$.main(TestWriteSeqFile02.scala:21)
at dhao.test.SeqFile.TestWriteSeqFile02.main(TestWriteSeqFile02.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)

Re: Spark can not access jar from HDFS !!

2015-05-09 Thread Michael Armbrust
That code path is entirely delegated to hive.  Does hive support this?  You
might try instead using sparkContext.addJar.

On Sat, May 9, 2015 at 12:32 PM, Ravindra ravindra.baj...@gmail.com wrote:

 Hi All,

 I am trying to create custom udfs with hiveContext as given below -
 scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS
 'com.abc.api.udf.MyUpper' USING JAR
 'hdfs:///users/ravindra/customUDF2.jar')

 I have put the udf jar in the hdfs at the path given above. The same
 command works well in the hive shell but failing here in the spark shell.
 And it fails as given below. -
 15/05/10 00:41:51 ERROR Task: FAILED:
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR
 hdfs:///users/ravindra/customUDF2.jar
 15/05/10 00:41:51 INFO FunctionTask: create function:
 org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR
 hdfs:///users/ravindra/customUDF2.jar
 at
 org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305)
 at
 org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179)
 at
 org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81)
 at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
 at
 org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
 at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
 at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
 at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
 at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
 at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
 at
 org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
 at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
 at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
 at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18)
 at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
 at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25)
 at $line17.$read$$iwC$$iwC$$iwC.init(console:27)
 at $line17.$read$$iwC$$iwC.init(console:29)
 at $line17.$read$$iwC.init(console:31)
 at $line17.$read.init(console:33)
 at $line17.$read$.init(console:37)
 at $line17.$read$.clinit(console)
 at $line17.$eval$.init(console:7)
 at $line17.$eval$.clinit(console)
 at $line17.$eval.$print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at 

Re: spark and binary files

2015-05-09 Thread ayan guha
Spark uses any inputformat you specify and number of splits=number of RDD
partitions. You may want to take a deeper look at
SparkContext.newAPIHadoopRDD to load your data.



On Sat, May 9, 2015 at 4:48 PM, tog guillaume.all...@gmail.com wrote:

 Hi

 I havé an application that currently run using MR. It currently starts
 extracting information from a proprietary binary file that is copied to
 HDFS. The application starts by creating business objects from information
 extracted from the binary files. Later those objects are used for further
 processing using again MR jobs.

 I am planning to move towards Spark and I clearly see that I could use
 JavaRDDbusinessObjects for parallel processing. however it is not yet
 obvious what could be the process to generate this RDD from my binary file
 in parallel.

 Today I use parallelism based on the split assign to each of the map
 elements of a job. Can I mimick such a thing using spark. All example I
 have seen so far are using text files for which I guess the partitions are
 based on a given number of contiguous lines.

 Any help or pointer would be appreciated

 Cheers
 Guillaume


 --
 PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net




-- 
Best Regards,
Ayan Guha


Re: Submit Spark application in cluster mode and supervised

2015-05-09 Thread James King
Many Thanks Silvio,

What I found out later is the if there was catastrophic failure and all the
daemons fail at the same time before any fail-over takes place in this case
when you bring back the cluster up the the job resumes only on the Master
is was last running on before the failure.

Otherwise during partial failure normal fail-over takes place and the
driver is handed over to another Master.

Which answers my initial question.

Regards
jk

On Fri, May 8, 2015 at 7:34 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

   If you’re using multiple masters with ZooKeeper then you should set
 your master URL to be

  spark://host01:7077,host02:7077

  And the property spark.deploy.recoveryMode=ZOOKEEPER

  See here for more info:
 http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

   From: James King
 Date: Friday, May 8, 2015 at 11:22 AM
 To: user
 Subject: Submit Spark application in cluster mode and supervised

   I have two hosts host01 and host02 (lets call them)

  I run one Master and two Workers on host01
 I also run one Master and two Workers on host02

  Now I have 1 LIVE Master on host01 and a STANDBY Master on host02
 The LIVE Master is aware of all Workers in the cluster

  Now I submit a Spark application using

  bin/spark-submit --class SomeApp --deploy-mode cluster --supervise
 --master spark://host01:7077 Some.jar

  This to make the driver resilient to failure.

  Now the interesting part:

  If I stop the cluster (all daemons on all hosts) and restart
 the Master and Workers *only* on host01 the job resumes! as expected.

  But if I stop the cluster (all daemons on all hosts) and restart the
 Master and Workers *only* on host02 the job *does not*
 resume execution! why?

  I can see the driver on host02 WebUI listed but no job execution. Please
 let me know why.

  Am I wrong to expect it to resume execution in this case?








Re: Duplicate entries in output of mllib column similarities

2015-05-09 Thread Richard Bolkey
Hi Reza,

After a bit of digging, I had my previous issue a little bit wrong. We're
not getting duplicate (i,j) entries, but we are getting transposed entries
(i,j) and (j,i) with potentially different scores. We assumed the output
would be a triangular matrix. Still, let me know if that's expected. A
transposed entry occurs for about 5% of our output entries.

scala matrix.entries.filter(x = (x.i,x.j) == (22769,539029)).collect()
res23: Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] =
Array(MatrixEntry(22769,539029,0.00453050595770095))

scala matrix.entries.filter(x = (x.i,x.j) == (539029,22769)).collect()
res24: Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] =
Array(MatrixEntry(539029,22769,0.002265252978850475))

I saved a subset of vectors to object files that replicates the issue .
It's about 300mb. Should I try to whittle that down some more? What would
be the best way to get that to you.

Many thanks,
Rick

On Thu, May 7, 2015 at 8:58 PM, Reza Zadeh r...@databricks.com wrote:

 This shouldn't be happening, do you have an example to reproduce it?

 On Thu, May 7, 2015 at 4:17 PM, rbolkey rbol...@gmail.com wrote:

 Hi,

 I have a question regarding one of the oddities we encountered while
 running
 mllib's column similarities operation. When we examine the output, we find
 duplicate matrix entries (the same i,j). Sometimes the entries have the
 same
 value/similarity score, but they're frequently different too.

 Is this a known issue? An artifact of the probabilistic nature of the
 output? Which output score should we trust (lower vs higher one when
 different)? We're using a threshold of 0.3, and running Spark 1.3.1 on a
 10
 node cluster.

 Thanks
 Rick



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-entries-in-output-of-mllib-column-similarities-tp22807.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark SQL and Hive interoperability

2015-05-09 Thread barge.nilesh
hi, try your first method but create an external table in hive.

like:
hive -e CREATE *EXTERNAL* TABLE people (name STRING, age INT) ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'; 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-and-Hive-interoperability-tp22690p22828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to implement an Evaluator for a ML pipeline?

2015-05-09 Thread Stefan H.
Hello everyone,

I am stuck with the (experimental, I think) API for machine learning
pipelines. I have a pipeline with just one estimator (ALS) and I want it to
try different values for the regularization parameter. Therefore I need to
supply an Evaluator that returns a value of type Double. I guess this could
be something like accuracy or mean squared error? The only implementation I
found is BinaryClassificationEvaluator, and I did not understand the
computation there.

I could not find detailed documentation so I implemented a dummy Evaluator
that just returns the regularization parameter:

  new Evaluator {
def evaluate(dataset: DataFrame, paramMap: ParamMap): Double = 
  paramMap.get(als.regParam).getOrElse(throw new Exception)
  }

I just wanted to see whether the lower or higher value wins. On the
resulting model I inspected the chosen regularization parameter this way:

  cvModel.bestModel.fittingParamMap.get(als.regParam)

And it was the highest of my three regularization parameter candidates.
Strange thing is, if I negate the return value of the Evaluator, that line
still returns the highest regularization parameter candidate.

So I am probably working with false assumptions. I'd be grateful if someone
could point me to some documentation or examples, or has a few hints to
share.

Cheers,
Stefan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark can not access jar from HDFS !!

2015-05-09 Thread Ravindra
Hi All,

I am trying to create custom udfs with hiveContext as given below -
scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS
'com.abc.api.udf.MyUpper' USING JAR
'hdfs:///users/ravindra/customUDF2.jar')

I have put the udf jar in the hdfs at the path given above. The same
command works well in the hive shell but failing here in the spark shell.
And it fails as given below. -
15/05/10 00:41:51 ERROR Task: FAILED:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR
hdfs:///users/ravindra/customUDF2.jar
15/05/10 00:41:51 INFO FunctionTask: create function:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR
hdfs:///users/ravindra/customUDF2.jar
at
org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305)
at
org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179)
at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81)
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153)
at
org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85)
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503)
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)
at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305)
at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)
at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)
at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)
at
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)
at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18)
at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25)
at $line17.$read$$iwC$$iwC$$iwC.init(console:27)
at $line17.$read$$iwC$$iwC.init(console:29)
at $line17.$read$$iwC.init(console:31)
at $line17.$read.init(console:33)
at $line17.$read$.init(console:37)
at $line17.$read$.clinit(console)
at $line17.$eval$.init(console:7)
at $line17.$eval$.clinit(console)
at $line17.$eval.$print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

15/05/10 00:41:51 ERROR Driver: FAILED: Execution 

spark and binary files

2015-05-09 Thread tog
Hi

I havé an application that currently run using MR. It currently starts
extracting information from a proprietary binary file that is copied to
HDFS. The application starts by creating business objects from information
extracted from the binary files. Later those objects are used for further
processing using again MR jobs.

I am planning to move towards Spark and I clearly see that I could use
JavaRDDbusinessObjects for parallel processing. however it is not yet
obvious what could be the process to generate this RDD from my binary file
in parallel.

Today I use parallelism based on the split assign to each of the map
elements of a job. Can I mimick such a thing using spark. All example I
have seen so far are using text files for which I guess the partitions are
based on a given number of contiguous lines.

Any help or pointer would be appreciated

Cheers
Guillaume


-- 
PGP KeyID: 2048R/EA31CFC9  subkeys.pgp.net


RE: Using Pandas/Scikit Learning in Pyspark

2015-05-09 Thread Felix C
Your python job runs in a python process interacting with JVM. You do need 
matching python version and other dependent packages on the driver and all 
worker nodes if you run in YARN mode.

--- Original Message ---

From: Bin Wang binwang...@gmail.com
Sent: May 8, 2015 9:56 PM
To: Apache.Spark.User user@spark.apache.org
Subject: Using Pandas/Scikit Learning in Pyspark

Hey there,

I have a CDH cluster where the default Python installed on those Redhat
Linux are Python2.6.

I am thinking about developing a Spark application using pyspark and I want
to be able to use Pandas and Scikit learn package. Anaconda Python
interpreter has the most funtionalities out of box, however, when I try to
use Anaconda Python2.7. The Spark job won't run properly and failed due to
the reason that the Python interpreter is not consistent across the
cluster.
Here are my questions:

(1) I took a quick look at the source code of pyspark, looks like in the
end, they are using spark-submit. Doesn't that mean all the work in the end
will be translated into scala code and distribute the workload to the whole
cluster? In that case, I should not worry about the Python interpreter
beyond the master node right?

(2) If the Spark job need consistent Python library to be installed on
every node. Should I install Anaconda Python on all of them? If so, what is
the modern way of managing the Python ecosystem on the cluster?

I am a big fan of Python so please guide me.

Best regards,

Bin