custom join using complex keys
Hi folks, I need to join RDDs having composite keys like this : (K1, K2 ... Kn). The joining rule looks like this : * if left.K1 == right.K1, then we have a true equality, and all K2... Kn are also equal. * if left.K1 != right.K1 but left.K2 == right.K2, I have a partial equality, and I also want the join to occur there. * if K2 don't match, then I test K3 and so on. Is there a way to implement a custom join with a given predicate to implement this ? (I would probably also need to provide a partitioner, and some sorting predicate). Left and right RDD are 1-10 millions lines long. Any idea ? Thanks Mathieu
Spark streaming closes with Cassandra Conector
I am trying save some data in Cassandra in app with spark Streaming: Messages.foreachRDD { . . . CassandraRDD.saveToCassandra(test,test) } When I run, the app is closes when I recibe data or can't connect with Cassandra. Some idea? Thanks -- Atte. Sergio Jiménez
Does NullWritable can not be used in Spark?
Hi, experts. I wrote a spark program to write a sequence file. I found if I used the NullWritable as the Key Class of the SequenceFile, the program reported exceptions. But if I used the BytesWritable or Text as the Key Class, the program did not report the exceptions. Does spark not support NullWritable class? The spark version I use is 1.3.0 and the exceptions are as following: ERROR yarn.ApplicationMaster: User class threw exception: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at dhao.test.SeqFile.TestWriteSeqFile02$.main(TestWriteSeqFile02.scala:21) at dhao.test.SeqFile.TestWriteSeqFile02.main(TestWriteSeqFile02.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
Re: Spark can not access jar from HDFS !!
That code path is entirely delegated to hive. Does hive support this? You might try instead using sparkContext.addJar. On Sat, May 9, 2015 at 12:32 PM, Ravindra ravindra.baj...@gmail.com wrote: Hi All, I am trying to create custom udfs with hiveContext as given below - scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS 'com.abc.api.udf.MyUpper' USING JAR 'hdfs:///users/ravindra/customUDF2.jar') I have put the udf jar in the hdfs at the path given above. The same command works well in the hive shell but failing here in the spark shell. And it fails as given below. - 15/05/10 00:41:51 ERROR Task: FAILED: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar 15/05/10 00:41:51 INFO FunctionTask: create function: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar at org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305) at org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179) at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line17.$read$$iwC$$iwC$$iwC.init(console:27) at $line17.$read$$iwC$$iwC.init(console:29) at $line17.$read$$iwC.init(console:31) at $line17.$read.init(console:33) at $line17.$read$.init(console:37) at $line17.$read$.clinit(console) at $line17.$eval$.init(console:7) at $line17.$eval$.clinit(console) at $line17.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at
Re: spark and binary files
Spark uses any inputformat you specify and number of splits=number of RDD partitions. You may want to take a deeper look at SparkContext.newAPIHadoopRDD to load your data. On Sat, May 9, 2015 at 4:48 PM, tog guillaume.all...@gmail.com wrote: Hi I havé an application that currently run using MR. It currently starts extracting information from a proprietary binary file that is copied to HDFS. The application starts by creating business objects from information extracted from the binary files. Later those objects are used for further processing using again MR jobs. I am planning to move towards Spark and I clearly see that I could use JavaRDDbusinessObjects for parallel processing. however it is not yet obvious what could be the process to generate this RDD from my binary file in parallel. Today I use parallelism based on the split assign to each of the map elements of a job. Can I mimick such a thing using spark. All example I have seen so far are using text files for which I guess the partitions are based on a given number of contiguous lines. Any help or pointer would be appreciated Cheers Guillaume -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net -- Best Regards, Ayan Guha
Re: Submit Spark application in cluster mode and supervised
Many Thanks Silvio, What I found out later is the if there was catastrophic failure and all the daemons fail at the same time before any fail-over takes place in this case when you bring back the cluster up the the job resumes only on the Master is was last running on before the failure. Otherwise during partial failure normal fail-over takes place and the driver is handed over to another Master. Which answers my initial question. Regards jk On Fri, May 8, 2015 at 7:34 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: If you’re using multiple masters with ZooKeeper then you should set your master URL to be spark://host01:7077,host02:7077 And the property spark.deploy.recoveryMode=ZOOKEEPER See here for more info: http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper From: James King Date: Friday, May 8, 2015 at 11:22 AM To: user Subject: Submit Spark application in cluster mode and supervised I have two hosts host01 and host02 (lets call them) I run one Master and two Workers on host01 I also run one Master and two Workers on host02 Now I have 1 LIVE Master on host01 and a STANDBY Master on host02 The LIVE Master is aware of all Workers in the cluster Now I submit a Spark application using bin/spark-submit --class SomeApp --deploy-mode cluster --supervise --master spark://host01:7077 Some.jar This to make the driver resilient to failure. Now the interesting part: If I stop the cluster (all daemons on all hosts) and restart the Master and Workers *only* on host01 the job resumes! as expected. But if I stop the cluster (all daemons on all hosts) and restart the Master and Workers *only* on host02 the job *does not* resume execution! why? I can see the driver on host02 WebUI listed but no job execution. Please let me know why. Am I wrong to expect it to resume execution in this case?
Re: Duplicate entries in output of mllib column similarities
Hi Reza, After a bit of digging, I had my previous issue a little bit wrong. We're not getting duplicate (i,j) entries, but we are getting transposed entries (i,j) and (j,i) with potentially different scores. We assumed the output would be a triangular matrix. Still, let me know if that's expected. A transposed entry occurs for about 5% of our output entries. scala matrix.entries.filter(x = (x.i,x.j) == (22769,539029)).collect() res23: Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = Array(MatrixEntry(22769,539029,0.00453050595770095)) scala matrix.entries.filter(x = (x.i,x.j) == (539029,22769)).collect() res24: Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] = Array(MatrixEntry(539029,22769,0.002265252978850475)) I saved a subset of vectors to object files that replicates the issue . It's about 300mb. Should I try to whittle that down some more? What would be the best way to get that to you. Many thanks, Rick On Thu, May 7, 2015 at 8:58 PM, Reza Zadeh r...@databricks.com wrote: This shouldn't be happening, do you have an example to reproduce it? On Thu, May 7, 2015 at 4:17 PM, rbolkey rbol...@gmail.com wrote: Hi, I have a question regarding one of the oddities we encountered while running mllib's column similarities operation. When we examine the output, we find duplicate matrix entries (the same i,j). Sometimes the entries have the same value/similarity score, but they're frequently different too. Is this a known issue? An artifact of the probabilistic nature of the output? Which output score should we trust (lower vs higher one when different)? We're using a threshold of 0.3, and running Spark 1.3.1 on a 10 node cluster. Thanks Rick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Duplicate-entries-in-output-of-mllib-column-similarities-tp22807.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL and Hive interoperability
hi, try your first method but create an external table in hive. like: hive -e CREATE *EXTERNAL* TABLE people (name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-and-Hive-interoperability-tp22690p22828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to implement an Evaluator for a ML pipeline?
Hello everyone, I am stuck with the (experimental, I think) API for machine learning pipelines. I have a pipeline with just one estimator (ALS) and I want it to try different values for the regularization parameter. Therefore I need to supply an Evaluator that returns a value of type Double. I guess this could be something like accuracy or mean squared error? The only implementation I found is BinaryClassificationEvaluator, and I did not understand the computation there. I could not find detailed documentation so I implemented a dummy Evaluator that just returns the regularization parameter: new Evaluator { def evaluate(dataset: DataFrame, paramMap: ParamMap): Double = paramMap.get(als.regParam).getOrElse(throw new Exception) } I just wanted to see whether the lower or higher value wins. On the resulting model I inspected the chosen regularization parameter this way: cvModel.bestModel.fittingParamMap.get(als.regParam) And it was the highest of my three regularization parameter candidates. Strange thing is, if I negate the return value of the Evaluator, that line still returns the highest regularization parameter candidate. So I am probably working with false assumptions. I'd be grateful if someone could point me to some documentation or examples, or has a few hints to share. Cheers, Stefan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark can not access jar from HDFS !!
Hi All, I am trying to create custom udfs with hiveContext as given below - scala hiveContext.sql (CREATE TEMPORARY FUNCTION sample_to_upper AS 'com.abc.api.udf.MyUpper' USING JAR 'hdfs:///users/ravindra/customUDF2.jar') I have put the udf jar in the hdfs at the path given above. The same command works well in the hive shell but failing here in the spark shell. And it fails as given below. - 15/05/10 00:41:51 ERROR Task: FAILED: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar 15/05/10 00:41:51 INFO FunctionTask: create function: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to load JAR hdfs:///users/ravindra/customUDF2.jar at org.apache.hadoop.hive.ql.exec.FunctionTask.addFunctionResources(FunctionTask.java:305) at org.apache.hadoop.hive.ql.exec.FunctionTask.createTemporaryFunction(FunctionTask.java:179) at org.apache.hadoop.hive.ql.exec.FunctionTask.execute(FunctionTask.java:81) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:305) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35) at org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35) at org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) at org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:18) at $line17.$read$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $line17.$read$$iwC$$iwC$$iwC$$iwC.init(console:25) at $line17.$read$$iwC$$iwC$$iwC.init(console:27) at $line17.$read$$iwC$$iwC.init(console:29) at $line17.$read$$iwC.init(console:31) at $line17.$read.init(console:33) at $line17.$read$.init(console:37) at $line17.$read$.clinit(console) at $line17.$eval$.init(console:7) at $line17.$eval$.clinit(console) at $line17.$eval.$print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/05/10 00:41:51 ERROR Driver: FAILED: Execution
spark and binary files
Hi I havé an application that currently run using MR. It currently starts extracting information from a proprietary binary file that is copied to HDFS. The application starts by creating business objects from information extracted from the binary files. Later those objects are used for further processing using again MR jobs. I am planning to move towards Spark and I clearly see that I could use JavaRDDbusinessObjects for parallel processing. however it is not yet obvious what could be the process to generate this RDD from my binary file in parallel. Today I use parallelism based on the split assign to each of the map elements of a job. Can I mimick such a thing using spark. All example I have seen so far are using text files for which I guess the partitions are based on a given number of contiguous lines. Any help or pointer would be appreciated Cheers Guillaume -- PGP KeyID: 2048R/EA31CFC9 subkeys.pgp.net
RE: Using Pandas/Scikit Learning in Pyspark
Your python job runs in a python process interacting with JVM. You do need matching python version and other dependent packages on the driver and all worker nodes if you run in YARN mode. --- Original Message --- From: Bin Wang binwang...@gmail.com Sent: May 8, 2015 9:56 PM To: Apache.Spark.User user@spark.apache.org Subject: Using Pandas/Scikit Learning in Pyspark Hey there, I have a CDH cluster where the default Python installed on those Redhat Linux are Python2.6. I am thinking about developing a Spark application using pyspark and I want to be able to use Pandas and Scikit learn package. Anaconda Python interpreter has the most funtionalities out of box, however, when I try to use Anaconda Python2.7. The Spark job won't run properly and failed due to the reason that the Python interpreter is not consistent across the cluster. Here are my questions: (1) I took a quick look at the source code of pyspark, looks like in the end, they are using spark-submit. Doesn't that mean all the work in the end will be translated into scala code and distribute the workload to the whole cluster? In that case, I should not worry about the Python interpreter beyond the master node right? (2) If the Spark job need consistent Python library to be installed on every node. Should I install Anaconda Python on all of them? If so, what is the modern way of managing the Python ecosystem on the cluster? I am a big fan of Python so please guide me. Best regards, Bin