Spark Shell No suitable driver found error
HI All, I have issues to make external jar available to Spark Shell I have used -jars options while starting Spark Shell to make these available when I give command Class.forName(org.postgresql.Driver it is not giving any error But when action operation is performed on RDD than I am getting typical No suitable driver found for jdbc:postgresql:// Please provide solution if anybody has faced and fixed the same Regards, Satish Chandra
[no subject]
HI All, I have issues to make external jar available to Spark Shell I have used -jars options while starting Spark Shell to make these available when I give command Class.forName(org.postgresql.Driver it is not giving any error But when action operation is performed on RDD than I am getting typical No suitable driver found for jdbc:postgresql:// Please provide solution if anybody has faced and fixed the same Regards, Satish Chandra
Re: Problem in Understanding concept of Physical Cores
Hi TD, Thanks for elaboration. I have further doubts based on further test that I did after your guidance Case 1: Standalone Spark-- In standalone mode, as you explained,master in spark-submit local[*] implicitly, so it uses as creates threads as the number of cores that VM has, but User can control the number of partitions which needs to be created and in accordance with number of partitions, tasks will be created. Query 1: If I have 4 cores, then 4 threads will be created but if I give 40 partitions to my data, than 40 tasks will be created which needs to be executed on 4 threads. Does it work this way, that 4 threads execute 4 tasks (out of 40 in parallel) and when first set of task gets complete then they pick next 4 tasks and then they ask execute tasks in sequential manner. That is 4 tasks concurrent but rest of tasks in sequence when first concurrent set gets complete. Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it seems number of threads do not increase. When I execute sc.defaultParallelism then it does not seem to take any effect on passed total-num-cores parameter. So when we use this parameter what does it exactly mean. Does it control number of threads or does it say to Spark Master to provide these many number of physical cores to this job. I mean is this parameter relevant not for a single job but if multiple jobs are running in cluster than to tell Spark Scheduler not to overallocate resources to a single job. Also setting this parameter, does it guarantee any behavior or is it only an indicator for Spark Scheduler. Case 2: Spark on Yarn In Spark on Yarn, it seems that threads which get created is not based on number of physical cores underlying. Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this understanding correct. If yes then does it mean Developer has a control on number of threads to request to Spark by passing executor-core option (which was not there in Standalone mode as number of threads was based on number of physical cores). Is there a special reason for this kind of difference Query 4: Also it seems there is a restriction on value I can pass in executor-cores option which seems to be dependent on underlying physical cores. For example If I have 4 cores and I pass this value to be 20 then it works, but if I pass this value to be 100 then it does not work. So it seems actual number of threads which can be created inside JVM are still limited by number of physical cores but it can be controlled by executor-cores option. Kindly elaborate what is best practice to request how many threads based on physical cores and how physical cores limit this behavior. Query 5: Is there a reason for difference in behavior of total-num-cores (does not create a thread ) in Stand Alone mode and exectuor-cores( creates thread) in Yarn mode in how threads to be created. It seems in Yarn mode we can create more threads in same Executor JVM compated to Standalone mode for same number of physical cores. Thanks and Regards Aniruddh On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das t...@databricks.com wrote: Query 1) What spark runs is tasks in task slots, whatever is the mapping ot tasks to physical cores it does not matter. If there are two task slots (2 threads in local mode, or an executor with 2 task slots in distributed mode), it can only run two tasks concurrently. That is true even if the task is really not doing much. There is no multiplexing going on between tasks and task slots. So to answer your query 1, there is 1 thread that is permanently allocated to the receiver task (a long running task) even if it does not do much. There is no thread left to process the data that is being received. Query 2) I think this is already explained above. The receiver task is taking the only available slot, leaving nothing for the actual tasks to execute. This will work fine as long as there is n+1 threads, where n = number of receivers. Query 3) The 2nd thread will be running tasks that process the in-memory blocks of data generated by the receiver running on the first thread. Now if the operating system underneath has only one core (physical or virtual), then those two thread will be multiplexing the resources of that core. On Thu, Jul 9, 2015 at 1:41 AM, Aniruddh Sharma asharma...@gmail.com wrote: Thanks for revert.I still have a confusion. Kindly find my understanding Following is the code val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream(localhost, ) lines.print() ssc.start() Case 1: When I launch VM with only 1 core and start spark-shell without any parameter then as per above explanation it uses local[*] implicitly and it creates 1 thread as VM has 1 core. Query 1) But what does it try to execute in
RE:Building scaladoc using build/sbt unidoc failure
Hi, I am getting the assertion error while trying to run build/sbt unidoc same as you described in Building scaladoc using build/sbt unidoc failure .Could you tell me how you get it working ? | | | | | | | | | Building scaladoc using build/sbt unidoc failureHello,I am trying to build scala doc from the 1.4 branch. | | | | View on mail-archives.apache.org | Preview by Yahoo | | | | | Thanks Regards, Meethu M
HiveContext with Cloudera Pseudo Cluster
Hi All, I am trying to run a simple join on Hive through SparkShell on pseudo cloudera cluster on ubuntu machine : *val hc = new HiveContext(sc);* *hc.sql(use testdb);* But it is failing with the message : org.apache.hadoop.hive.ql.parse.SemanticException: Database does not exist: testdb The same seems to work on hive console. Would appreciate any help to solve the same. Regards, Sukhi
Ipython notebook, ec2 spark cluster and matplotlib
Hello everybody, I'm running a two node spark cluster on ec2, created using the provided scripts. I then ssh into the master and invoke PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS='notebook --profile=pyspark' spark/bin/pyspark. This launches a spark notebook which has been instructed to listen to all interfaces, not only localhost. I then open my browser and start playing around. All commands run fine as far as I've seen but there's an annoying problem: I cannot display matplotlib graphs in a cell, I get the following error TclError: no display name and no $DISPLAY environment variable. I've searched the web and I've tried the following two approaches: 1. use -X to enable X11 forwarding: when I use this option I get no error, a slow execution time and no image at all 2. use matplotlib.use('agg'), no image but if I execute fig.savefig I can totally see the image being created. Has anybody have a similar problem? If so can you help me troubleshoot? Thanks, MD
reduceByKeyAndWindow with initial state
We have a streaming job that makes use of reduceByKeyAndWindow https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L334-L341. We want this to work with an initial state. The idea is to avoid losing state if the streaming job is restarted, also to take historical data into account for the windows. But reduceByKeyAndWindow doesn't accept any initialRDD parameter like updateStateByKey https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445 does. The plan is to extend reduceByKeyAndWindow to accept an initalRDDs parameter, so that the DStream starts with those RDDs as initial value of generatedRDD rather than an empty map. But the generatedRDD is a private variable, so I'm bit confused on how to proceed with the plan.
Re: Accessing Spark Web UI from another place than where the job actually ran
Thank you for your answer! The problem is, I cannot ssh to the master directly. I have to ssh first to a frontend, then I have to ssh to another frontend. And only from this last frontend I can ssh to my master. Can I do this by ssh -ing with -L to the first two frontends and to the master? And maybe the traffic on master 8080 will be redirected from master-frontend1-frontend2-myPC? Thanks again, Roxana Roman 2015-07-10 9:53 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: When you connect to the machines you can create an ssh tunnel to access the UI : ssh -L 8080:127.0.0.1:8080 MasterMachinesIP And then you can simply open localhost:8080 in your browser and it should show up the UI. Thanks Best Regards On Thu, Jul 9, 2015 at 7:44 PM, rroxanaioana rroxanaio...@gmail.com wrote: I have a spark cluster with 1 master 9nodes.I am running in standalone-mode. I do not have access to a web browser from any of the nodes in the cluster (I am connecting to the nodes through ssh --it is a grid5000 cluster). I was wondering, is there any possibility to access Spark Web UI in this case? I tried by copying the logs from my cluster in SPARK_PATH/work on my local machine (leaving the impression that the jobs that ran in the cluster were ran on my local machine). This idea came after reading this part from the documentation: If an application has logged events over the course of its lifetime, then the Standalone master’s web UI will automatically re-render the application’s UI after the application has finished. But it did not work. What I can see in the UI is: Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Spark-Web-UI-from-another-place-than-where-the-job-actually-ran-tp23745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Roxana Ioana Roman*
Re: query on Spark + Flume integration using push model
Here's an example https://github.com/przemek1990/spark-streaming Thanks Best Regards On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, I'm trying to configure the flume to push data into a sink so that my stream job could pick up the data. My events are in JSON format, but the Spark + Flume integration [1] document only refer to Avro sink. [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html I looked at some of the examples online, and they all refer to avro type: agent.sinks.avroSink.type = avro If I set the type to avro and send the data in JSON, will it work? I'm unable to try this because the Stream job throwing Avro 'org.apache.flume.source.avro.AvroFlumeEvent' exception. Please advice how to handle this situation. many thanks
Re: Accessing Spark Web UI from another place than where the job actually ran
When you connect to the machines you can create an ssh tunnel to access the UI : ssh -L 8080:127.0.0.1:8080 MasterMachinesIP And then you can simply open localhost:8080 in your browser and it should show up the UI. Thanks Best Regards On Thu, Jul 9, 2015 at 7:44 PM, rroxanaioana rroxanaio...@gmail.com wrote: I have a spark cluster with 1 master 9nodes.I am running in standalone-mode. I do not have access to a web browser from any of the nodes in the cluster (I am connecting to the nodes through ssh --it is a grid5000 cluster). I was wondering, is there any possibility to access Spark Web UI in this case? I tried by copying the logs from my cluster in SPARK_PATH/work on my local machine (leaving the impression that the jobs that ran in the cluster were ran on my local machine). This idea came after reading this part from the documentation: If an application has logged events over the course of its lifetime, then the Standalone master’s web UI will automatically re-render the application’s UI after the application has finished. But it did not work. What I can see in the UI is: Applications: 0 Running, 0 Completed Drivers: 0 Running, 0 Completed Status: ALIVE Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Accessing-Spark-Web-UI-from-another-place-than-where-the-job-actually-ran-tp23745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame insertInto fails, saveAsTable works (Azure HDInsight)
It seems an issue with the azure, there was a discussion over here https://azure.microsoft.com/en-in/documentation/articles/hdinsight-hadoop-spark-install/ Thanks Best Regards On Thu, Jul 9, 2015 at 9:42 PM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, I'm running Spark 1.4 on Azure. DataFrame's insertInto fails, but when saveAsTable works. It seems like some issue with accessing Azure's blob storage but that doesn't explain why one type of write works and the other doesn't. This is the stack trace: Caused by: org.apache.hadoop.fs.azure.AzureException: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2618) at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:417) at org.apache.hadoop.hive.shims.Hadoop23Shims.getNonCachedFileSystem(Hadoop23Shims.java:574) at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3424) at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3396) at org.apache.hadoop.hive.ql.Context.getScratchDir(Context.java:214) ... 59 more Caused by: org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:829) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:917) ... 70 more Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.ShellDecryptionKeyProvider not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.getAccountKeyFromConfiguration(AzureNativeFileSystemStore.java:826) ... 71 more Thanks, Daniel
Re: Caching in spark
https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory Thanks Best Regards On Fri, Jul 10, 2015 at 10:05 AM, vinod kumar vinodsachin...@gmail.com wrote: Hi Guys, Can any one please share me how to use caching feature of spark via spark sql queries? -Vinod
Word2Vec distributed?
Hi, I've been experimenting with the Spark Word2Vec implementation in the MLLib package. It seems to me that only the preparatory steps are actually performed in a distributed way, i.e. stages 0-2 that prepare the data. In stage 3 (mapPartitionsWithIndex at Word2Vec.scala:312), only one node seems to be working, using one CPU. I suppose this is related to the discussion in [1], essentially stating that the original algorithm allows for multi-threading, but not for distributed computation due to frequent internal communication. To my understanding, this issue has not been fully resolved in Spark, has it? I just wonder whether I am interpreting the current situation correctly. Thanks! Carsten [1] https://issues.apache.org/jira/browse/SPARK-2510 -- Carsten Schnober Doctoral Researcher Ubiquitous Knowledge Processing (UKP) Lab FB 20 / Computer Science Department Technische Universität Darmstadt Hochschulstr. 10, D-64289 Darmstadt, Germany phone [+49] (0)6151 16-6227, fax -5455, room S2/02/B111 schno...@ukp.informatik.tu-darmstadt.de www.ukp.tu-darmstadt.de Web Research at TU Darmstadt (WeRC): www.werc.tu-darmstadt.de GRK 1994: Adaptive Preparation of Information from Heterogeneous Sources (AIPHES): www.aiphes.tu-darmstadt.de PhD program: Knowledge Discovery in Scientific Literature (KDSL) www.kdsl.tu-darmstadt.de -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Word2Vec-distributed-tp23758.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Saving RDD into cassandra keyspace.
Hi, I am beginner to spark , I want save the word and its count to cassandra keyspace, I wrote the following code import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setAppName(Spark Count)) val tokenized = sc.textFile(args(0)).flatMap(_.split( )) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) wordCounts.saveToCassandra(sparkdata, words, SomeColumns(word, count)); } and did spark-submit. The code doesn't work ( may be some very basic error because I am new to it).I know there is datastax cassandra connector but how to make connection? What all things I am missing in my code? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
Re: Breaking lineage and reducing stages in Spark Streaming
Thanks for the help Dean/TD, I was able to cut the lineage with checkpointing with following code: dstream.countByValue().foreachRDD((rdd, time) = { val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base) val toUpdate = joined.filter(myfilter).map(mymap) val toNotUpdate = joined.filter(mynotfilter).map(mymap) base = base.union(toUpdate).reduceByKey(_+_, 2) current = toNotUpdate if(time.isMultipleOf(duration)){ base.checkpoint() current.checkpoint() } println(toUpdate.count()) // to persistence }) Thanks, Anand On 10 July 2015 at 02:16, Tathagata Das t...@databricks.com wrote: Summarizing the main problems discussed by Dean 1. If you have an infinitely growing lineage, bad things will eventually happen. You HAVE TO periodically (say every 10th batch), checkpoint the information. 2. Unpersist the previous `current` RDD ONLY AFTER running an action on the `newCurrent`. Otherwise you are throwing current out of the cache before newCurrent has been computed. Modifying Dean's example. val newCurrent = rdd.union(current).reduceByKey(_+_) ... // join with newCurrent // collect or count or any action that uses newCurrent // // Now you can unpersist because the newCurrent has been persisted and wont require falling back to this cached current RDD. current.unpersist() On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler deanwamp...@gmail.com wrote: I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably goes back to the lineage... Same with the current.unpersist logic before it. Names are cheap, so just use local vals: val newCurrent = rdd.union(current).reduceByKey(_+_) current.unpersist() Also, what happens if you omit the 2 argument for the number of partitions in reduceByKey? Other minor points: I would change the joined, toUpdate, toNotUpdate logic to this: val joined = current.leftOuterJoin(newBase).map(mymap).cache() val toUpdate = joined.filter(myfilter).cache() val toNotUpdate = joined.filter(mynotfilter).cache() Maybe it's just for this email example, but you don't need to call collect on toUpdate before using foreach(println). If the RDD is huge, you definitely don't want to do that. Hope this helps. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya anand.na...@gmail.com wrote: Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd = { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler deanwamp...@gmail.com wrote: Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya anand.na...@gmail.com wrote: The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite resources, isnt it? On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya anand.na...@gmail.com wrote: Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel
K Nearest Neighbours
Hi, I have the following problem, which is a kind of special case of k nearest neighbours. I have an Array of Vectors (v1) and an RDD[(Long, Vector)] of pairs of vectors with indexes (v2). The array v1 easily fits into a single node's memory (~100 entries), but v2 is very large (millions of entries). My goal is to find for each vector in v1 the entries in v2 with least distance. The naive solution would be to define a helper function that computes all the distances between a vector from v1 and all vectors in v2, sorts them, and returns the top n results: def computeDistances(vector: Vector, vectors: RDD[(Long, Vector)], n:Int=10): Seq[Long] = { vectors.map { emb = (emb._1, Vectors.sqdist(emb._2, centroid)) } .sortBy(_._2) // sort by value .map(_._1) // retain indexes only .take(n) } So I can map the entries (after getting the indexes to keep track of the mappings) in v1 to the distances: v1.zipWithIndexes.map{ v = (computeDistances(v._1, v2), v._2) } This gives me for each entry in v1 the indexes of the n closest entries in v2. However, as v1 is an array, the computeDistances() calls are all done sequentially (on the driver, if I understand correctly) rather than distributed. The problem is that I must not convert v1 into an RDD because that will result in an error due to nested RDD actions in computeDistance(). To conclude, what I would like to do (if it were possible) is this: val v1: Seq[Vector] = ... val v2: RDD[(Long, Vector)] = ... sc.parallelize(v1).zipWithIndexes .map{ v = (computeDistances(v._1, v2), v._2) } Is there any good practice to approach problems like this? Thanks! Carsten -- Carsten Schnober Doctoral Researcher Ubiquitous Knowledge Processing (UKP) Lab FB 20 / Computer Science Department Technische Universität Darmstadt Hochschulstr. 10, D-64289 Darmstadt, Germany phone [+49] (0)6151 16-6227, fax -5455, room S2/02/B111 schno...@ukp.informatik.tu-darmstadt.de www.ukp.tu-darmstadt.de Web Research at TU Darmstadt (WeRC): www.werc.tu-darmstadt.de GRK 1994: Adaptive Preparation of Information from Heterogeneous Sources (AIPHES): www.aiphes.tu-darmstadt.de PhD program: Knowledge Discovery in Scientific Literature (KDSL) www.kdsl.tu-darmstadt.de -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/K-Nearest-Neighbours-tp23759.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Starting Spark-Application without explicit submission to cluster?
Hi, I am a bit confused about the steps I need to take to start a Spark application on a cluster. So far I had this impression from the documentation that I need to explicitly submit the application using for example spark-submit. However, from the SparkContext constructur signature I get the impression that maybe I do not have to do that after all: In http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.SparkContext the first constructor has (among other things) a parameter 'jars' which indicates the Collection of JARs to send to the cluster. To me this suggests that I can simply start the application anywhere and that it will deploy itself to the cluster in the same way a call to spark-submit would. Is that correct? If not, can someone explain why I can / need to provide master and jars etc. in the call to SparkContext because they essentially only duplicate what I would specify in the call to spark-submit. Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use when running spark-shell
Hi, I am running single spark-shell but observing this error when I give val sc = new SparkContext(conf) 15/07/10 15:42:56 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) has any one come across the same issue , any suggestions? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
Re: SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use when running spark-shell
that's because sc is already initialized. You can do sc.stop() before you initialize another one. Thanks Best Regards On Fri, Jul 10, 2015 at 3:54 PM, Prateek . prat...@aricent.com wrote: Hi, I am running single spark-shell but observing this error when I give val sc = new SparkContext(conf) 15/07/10 15:42:56 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) has any one come across the same issue , any suggestions? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
Best way to avoid updateStateByKey from running without data
UpdateStateByKey will run the update function on every interval, even if the incoming batch is empty. Is there a way to prevent that? If the incoming DStream contains no RDDs (or RDDs of count 0) then I don't want my update function to run. Note that this is different from running the update function and writing an if (!newValues.isEmpty) inside the function. Thanks, Michael - Michael Vogiatzis @mvogiatzis -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-avoid-updateStateByKey-from-running-without-data-tp23762.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to restrict disk space for spark caches on yarn?
Hi, i have a spark ML worklflow. It uses some persist calls. When i launch it with 1 tb dataset - it puts down all cluster, becauses it fills all disk space at /yarn/nm/usercache/root/appcache: http://i.imgur.com/qvRUrOp.png I found a yarn settings: /yarn/.nodemanager.localizer./cache/.target-size-mb - Target size of localizer cache in MB, per nodemanager. It is a target retention size that only includes resources with PUBLIC and PRIVATE visibility and excludes resources with APPLICATION visibility But it excludes resources with APPLICATION visibility, and spark cache as i understood is of APPLICATION type. Is it possible to restrict a disk space for spark application? Will spark fail if it wouldn't be able to persist on disk (StorageLevel.MEMORY_AND_DISK_SER) or it would recompute from data source? Thanks, Peter Rudenko
Re: query on Spark + Flume integration using push model
Hi Akhil, thank you for your reply. Does that mean that original Spark Streaming only support Avro? If that the case then why only Avro? Is there a particular reason? The project linked is for Scala but I'm using Java. Is there another project? On 10 July 2015 at 08:46, Akhil Das ak...@sigmoidanalytics.com wrote: Here's an example https://github.com/przemek1990/spark-streaming Thanks Best Regards On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, I'm trying to configure the flume to push data into a sink so that my stream job could pick up the data. My events are in JSON format, but the Spark + Flume integration [1] document only refer to Avro sink. [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html I looked at some of the examples online, and they all refer to avro type: agent.sinks.avroSink.type = avro If I set the type to avro and send the data in JSON, will it work? I'm unable to try this because the Stream job throwing Avro 'org.apache.flume.source.avro.AvroFlumeEvent' exception. Please advice how to handle this situation. many thanks
spark-submit
when I do run this command: ashutosh@pas-lab-server7:~/spark-1.4.0$ ./bin/spark-submit \ --class org.apache.spark.graphx.lib.Analytics \ --master spark://172.17.27.12:7077 \ assembly/target/scala-2.10/spark-assembly-1.4.0-hadoop2.2.0.jar \ pagerank soc-LiveJournal1.txt --numEPart=100 --nverts=4847571 --numIter=10 --partStrategy=EdgePartition2D I get an error: java.lang.ClassNotFoundException: org.apache.spark.graphx.lib.Analytics at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/10 15:31:35 INFO Utils: Shutdown hook called where is this class, what path should I give? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-tp23761.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark performance
Hi everyone, I have planned to move mssql server to spark?. I have using around 50,000 to 1l records. The spark performance is slow when compared to mssql server. What is the best data base(Spark or sql) to store or retrieve data around 50,000 to 1l records ? regards, Ravi
Re: Saving RDD into cassandra keyspace.
I would strongly encourage you to read the docs at, they are very useful in getting up and running: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md For your use case shown above, you will need to ensure that you include the appropriate version of the spark-cassandra-connectore assembly jar when you submit the job. The version you use should correspond to the version of Spark you are running. In addition, you will want to ensure that you set the spark.cassandra.connection.host as shown below, prior to creating the SparkContext. val conf = new SparkConf(true) .set(spark.cassandra.connection.host, 127.0.0.1) HTH -Todd On Fri, Jul 10, 2015 at 5:24 AM, Prateek . prat...@aricent.com wrote: Hi, I am beginner to spark , I want save the word and its count to cassandra keyspace, I wrote the following code import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setAppName(Spark Count)) val tokenized = sc.textFile(args(0)).flatMap(_.split( )) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) wordCounts.saveToCassandra(sparkdata, words, SomeColumns(word, count)); } and did spark-submit. The code doesn’t work ( may be some very basic error because I am new to it).I know there is datastax cassandra connector but how to make connection? What all things I am missing in my code? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
RE: Saving RDD into cassandra keyspace.
Hi, Thanks Todd..the link is really helpful to get started. ☺ -Prateek From: Todd Nist [mailto:tsind...@gmail.com] Sent: Friday, July 10, 2015 4:43 PM To: Prateek . Cc: user@spark.apache.org Subject: Re: Saving RDD into cassandra keyspace. I would strongly encourage you to read the docs at, they are very useful in getting up and running: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/0_quick_start.md For your use case shown above, you will need to ensure that you include the appropriate version of the spark-cassandra-connectore assembly jar when you submit the job. The version you use should correspond to the version of Spark you are running. In addition, you will want to ensure that you set the spark.cassandra.connection.host as shown below, prior to creating the SparkContext. val conf = new SparkConf(true) .set(spark.cassandra.connection.host, 127.0.0.1) HTH -Todd On Fri, Jul 10, 2015 at 5:24 AM, Prateek . prat...@aricent.commailto:prat...@aricent.com wrote: Hi, I am beginner to spark , I want save the word and its count to cassandra keyspace, I wrote the following code import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import com.datastax.spark.connector._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setAppName(Spark Count)) val tokenized = sc.textFile(args(0)).flatMap(_.split( )) val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) wordCounts.saveToCassandra(sparkdata, words, SomeColumns(word, count)); } and did spark-submit. The code doesn’t work ( may be some very basic error because I am new to it).I know there is datastax cassandra connector but how to make connection? What all things I am missing in my code? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus. DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
RE: SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use when running spark-shell
Thanks Akhil! I got it . ☺ From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Friday, July 10, 2015 4:02 PM To: Prateek . Cc: user@spark.apache.org Subject: Re: SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use when running spark-shell that's because sc is already initialized. You can do sc.stop() before you initialize another one. Thanks Best Regards On Fri, Jul 10, 2015 at 3:54 PM, Prateek . prat...@aricent.commailto:prat...@aricent.com wrote: Hi, I am running single spark-shell but observing this error when I give val sc = new SparkContext(conf) 15/07/10 15:42:56 WARN AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040http://SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) has any one come across the same issue , any suggestions? Thanks DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus. DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of the individual to whom it is addressed. It may contain privileged or confidential information and should not be circulated or used for any purpose other than for what it is intended. If you have received this message in error, please notify the originator immediately. If you are not the intended recipient, you are notified that you are strictly prohibited from using, copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility for loss or damage arising from the use of the information transmitted by this email including damage from virus.
Re: Debug Spark Streaming in PyCharm
spark-submit does a lot of magic configurations (classpaths etc) underneath the covers to enable pyspark to find Spark JARs, etc. I am not sure how you can start running things directly from the PyCharm IDE. Others in the community may be able to answer. For now the main way to run pyspark stuff is through spark-submit, or pyspark (which uses spark-submit underneath). On Fri, Jul 10, 2015 at 6:28 AM, blbradley bradleytas...@gmail.com wrote: Hello, I'm trying to debug a PySpark app with Kafka Streaming in PyCharm. However, PySpark cannot find the jar dependencies for Kafka Streaming without editing the program. I can temporarily use SparkConf to set 'spark.jars', but I'm using Mesos for production and don't want to edit my program everytime I want to debug. I'd like to find a way to debug without editing the source. Here's what my PyCharm debug execution command looks like: home/brandon/.pyenv/versions/coinspark/bin/python2.7 /opt/pycharm-community/helpers/pydev/pydevd.py --multiproc --client 127.0.0.1 --port 59042 --file /home/brandon/src/coins/coinspark/streaming.py I might be able to use spark-submit has the command PyCharm runs, but I'm not sure if that will work with the debugger. Thoughts? Cheers! Brandon Bradley -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Debug-Spark-Streaming-in-PyCharm-tp23766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ordering of Batches in Spark streaming
Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla
Re: Spark GraphX memory requirements + java.lang.OutOfMemoryError: GC overhead limit exceeded
Hello again. So I could compute triangle numbers when run the code from spark shell without workers (with --driver-memory 15g option), but with workers I have errors. So I run spark shell: ./bin/spark-shell --master spark://192.168.0.31:7077 --executor-memory 6900m --driver-memory 15g and workers (by hands): ./bin/spark-class org.apache.spark.deploy.worker.Worker spark:// 192.168.0.31:7077 (2 workers, each has 8Gb RAM; master has 32 Gb RAM). The code now is: import org.apache.spark._ import org.apache.spark.graphx._ val graph = GraphLoader.edgeListFile(sc, /home/data/graph.txt).partitionBy(PartitionStrategy.RandomVertexCut) val newgraph = graph.convertToCanonicalEdges() val triangleNum = newgraph.triangleCount().vertices.map(x = x._2.toLong).reduce(_ + _)/3 So how to understand what amount of memory is needed? And why I need it so much? Dataset is only 1,1Gb small... Error: [Stage 7: (0 + 8) / 32]15/07/11 01:48:45 WARN TaskSetManager: Lost task 2.0 in stage 7.0 (TID 130, 192.168.0.28): io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.init(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:440) at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:187) at io.netty.buffer.PoolArena.allocate(PoolArena.java:165) at io.netty.buffer.PoolArena.reallocate(PoolArena.java:277) at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:108) at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:146) ... 10 more On 26 June 2015 at 14:06, Roman Sokolov ole...@gmail.com wrote: Yep, I already found it. So I added 1 line: val graph = GraphLoader.edgeListFile(sc, , ...) val newgraph = graph.convertToCanonicalEdges() and could successfully count triangles on newgraph. Next will test it on bigger (several Gb) networks. I am using Spark 1.3 and 1.4 but haven't seen this function in https://spark.apache.org/docs/latest/graphx-programming-guide.html Thanks a lot guys! Am 26.06.2015 13:50 schrieb Ted Yu yuzhih...@gmail.com: See SPARK-4917 which went into Spark 1.3.0 On Fri, Jun 26, 2015 at 2:27 AM, Robin East robin.e...@xense.co.uk wrote: You’ll get this issue if you just take the first 2000 lines of that file. The problem is triangleCount() expects srdId dstId which is not the case in the file (e.g. vertex 28). You can get round this by calling graph.convertToCanonical Edges() which removes bi-directional edges and ensures srcId dstId. Which version of Spark are you on? Can’t remember what version that method was introduced in. Robin On 26 Jun 2015, at 09:44, Roman Sokolov ole...@gmail.com wrote: Ok, but what does it means? I did not change the core files of spark, so is it a bug there? PS: on small datasets (500 Mb) I have no problem. Am 25.06.2015 18:02 schrieb Ted Yu yuzhih...@gmail.com: The assertion failure from TriangleCount.scala corresponds with the following lines: g.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) = val dblCount = optCounter.getOrElse(0) // double count should be even (divisible by two) assert((dblCount 1) == 0) Cheers On Thu, Jun 25, 2015 at 6:20 AM, Roman Sokolov ole...@gmail.com wrote: Hello! I am trying to compute number of triangles with GraphX. But get memory error or heap size, even though the dataset is very small (1Gb). I run the code in spark-shell, having 16Gb RAM machine (also tried with 2 workers on separate machines 8Gb RAM each). So
Re: How do we control output part files created by Spark job?
Is there a join involved in your sql? Have a look at spark.sql.shuffle.partitions? Srikanth On Wed, Jul 8, 2015 at 1:29 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Srikant thanks for the response. I have the following code: hiveContext.sql(insert into... ).coalesce(6) Above code does not create 6 part files it creates around 200 small files. Please guide. Thanks. On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote: Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is it possible to change the default port number 7077 for spark?
Hello all, In my lab a colleague installed and configured spark 1.3.0 on a 4 noded cluster on CDH5.4 environment. The default port number for our spark configuration is 7456. I have been trying to SSH to spark-master from using this port number but it fails every time giving error JVM is timed out. After reading the documentation , given by Cloudera http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_ports_cdh5.html it says that the default port number for spark configuration should be 7077 and that is what i see in all the posts here and elsewhere on search results in Google. So now I have three question please if anyone can help me answer all or any of them Q1) Will the spark configuration work only with port number 7077? If yes, then how can I change the port number? Q2) Do i need to install spark on all the machines in the cluster? Q3) Do run any spark job do I always have to SSH into the spark-master machine ? or is it possible to connect my laptop to the spark-master and invoke commands from my laptop to spark-master and worker machines? Thank you for your time. Ashish -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-change-the-default-port-number-7077-for-spark-tp23774.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Ordering of Batches in Spark streaming
Thanks Ayan , I was curious to know* how Spark does it *.Is there any *Documentation* where i can get the detail about that . Will you please point me out some detailed link etc . May be it does something like *transactional topologies in storm*.( https://storm.apache.org/documentation/Transactional-topologies.html) On Sat, Jul 11, 2015 at 9:13 AM, ayan guha guha.a...@gmail.com wrote: AFAIK, it is guranteed that batch t+1 will not start processing until batch t is done. ordeing within batch - what do you mean by that? In essence, the (mini) batch will get distributed in partitions like a normal RDD, so following rdd.zipWithIndex should give a wy to order them by the time they are received. On Sat, Jul 11, 2015 at 12:50 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla -- Best Regards, Ayan Guha -- Thanks Regards, Anshu Shukla
Re: JAR containing org.apache.hadoop.mapreduce.lib.input.FileInputFormat
For hadoop 2.x : tvf ~/2-hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/target/hadoop-mapreduce-client-core-2.8.0-SNAPSHOT.jar | grep FileInputFormat.class ... 17552 Fri Apr 24 15:57:54 PDT 2015 org/apache/hadoop/mapreduce/lib/input/FileInputFormat.class FYI On Fri, Jul 10, 2015 at 3:42 PM, Lincoln Atkinson lat...@microsoft.com wrote: Sorry, only indirectly Spark-related. I’ve attempting to create a .NET proxy for spark-core, using JNI4NET. At the moment I’m stuck with the following error when running the proxy generator: java.lang.NoClassDefFoundError: org.apache.hadoop.mapreduce.lib.input.FileInputFormat I’ve resolved similar issues by finding the appropriate JAR and adding it to the classpath. But so far I haven’t been able to figure out where this one comes from, does anyone know what JAR this is from? It’s not hadoop-common, hadoop-mapreduce-client-core, hadoop-mapreduce-client-common, hadoop-mapreduce-client-app, or hadoop-mapreduce-client-jobclient. Thanks, -Lincoln
RE: Spark performance
Hi Ravi, First, Neither Spark nor Spark SQL is a database. Both are compute engines, which need to be paired with a storage system. Seconds, they are designed for processing large distributed datasets. If you have only 100,000 records or even a million records, you don’t need Spark. A RDBMS will perform much better for that volume of data. Mohammed From: Ravisankar Mani [mailto:rrav...@gmail.com] Sent: Friday, July 10, 2015 3:50 AM To: user@spark.apache.org Subject: Spark performance Hi everyone, I have planned to move mssql server to spark?. I have using around 50,000 to 1l records. The spark performance is slow when compared to mssql server. What is the best data base(Spark or sql) to store or retrieve data around 50,000 to 1l records ? regards, Ravi
Re: reduceByKeyAndWindow with initial state
Are you talking about reduceByKeyAndWindow with or without inverse reduce? TD On Fri, Jul 10, 2015 at 2:07 AM, Imran Alam im...@newscred.com wrote: We have a streaming job that makes use of reduceByKeyAndWindow https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L334-L341. We want this to work with an initial state. The idea is to avoid losing state if the streaming job is restarted, also to take historical data into account for the windows. But reduceByKeyAndWindow doesn't accept any initialRDD parameter like updateStateByKey https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445 does. The plan is to extend reduceByKeyAndWindow to accept an initalRDDs parameter, so that the DStream starts with those RDDs as initial value of generatedRDD rather than an empty map. But the generatedRDD is a private variable, so I'm bit confused on how to proceed with the plan.
Linear search between particular log4j log lines
Hello, I have a very specific question on how to do a search between particular lines of log file. I did some research to find the answer and what I learned is that if one of the shuffle operation applied to RDD, there is no a way to reconstruct the sequence of line (except zipping with id). I'm looking for any useful approaches/workarounds how other developers solve that problem. Here is a sample: I have log4j log files where for each request/transaction a specific BEGIN and END transaction marker is printed. Somewhere in between other classes may report useful statistics, which is needed to parse, and unfortunately there is now a way to keep transaction id with that record. What is the best approach to link transaction with particular line between BEGIN and END markers? Assume, only timestamp and thread name are available: 2015-01-01 20:00:00 DEBUG className [Thread-0] - BEGIN TransactionID=AA1 2015-01-01 20:00:00 DEBUG className [Thread-0] - ... {some other logs} 2015-01-01 20:00:01 DEBUG className [Thread-0] - SQL execution time: 500ms 2015-01-01 20:00:02 DEBUG className [Thread-0] - ... {some other logs} 2015-01-01 20:00:05 DEBUG className [Thread-0] - END Finally, I want to get the result with transaction ID AA1 and SQL execution time 500ms. Probably, another good example would be - extracting java stacktrace from logs, when stacktrace lines wouldn't have any key strings (timestamp, thread id) at all to parse by. So far I've come up with one idea and one approach: 1) Find out the file and position of BEGIN line and run separate non-Spark process to parse it line-by-line. In this case the question is what is the best approach to know to which file this line belongs to, and what is the position? Is zipWithUniqueId helpful for that? Not sure if it's really effective and can help to find the file name (or may be hadoop partition). 2) I use thread id as a key and map that key with BEGIN / END lines. Then I create another RDD with the same key, but for SQL execution time line. Then I do left join of RDDs by thread id and filter by timestamps, coming from both RDDs: leaving only this SQL line which is prior to END line (SQL's timestamp is before END's timestamp). Approach like this becomes very confusing in cases when it's required to extract more information (lines) between BEGIN/END. Is there any recommendations how to handle cases like that? Thank you, Sergey -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Linear-search-between-particular-log4j-log-lines-tp23773.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Ordering of Batches in Spark streaming
AFAIK, it is guranteed that batch t+1 will not start processing until batch t is done. ordeing within batch - what do you mean by that? In essence, the (mini) batch will get distributed in partitions like a normal RDD, so following rdd.zipWithIndex should give a wy to order them by the time they are received. On Sat, Jul 11, 2015 at 12:50 PM, anshu shukla anshushuk...@gmail.com wrote: Hey , Is there any *guarantee of fix ordering among the batches/RDDs* . After searching a lot I found there is no ordering by default (from the framework itself ) not only on *batch wise *but *also ordering within batches* .But i doubt is there any change from old spark versions to spark 1.4 in this context. Any Comments please !! -- Thanks Regards, Anshu Shukla -- Best Regards, Ayan Guha
Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?
Michael, Thanks - Terry Michael Armbrust mich...@databricks.com于2015年7月11日星期六 04:02写道: Metastore configuration should be set in hive-site.xml. On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, I am trying to set the hive metadata destination to a mysql database in hive context, it works fine in spark 1.3.1, but it seems broken in spark 1.4.1-rc1, where it always connect to the default metadata: local), is this a regression or we must set the connection in hive-site.xml? The code is very simple in spark shell: * import org.apache.spark.sql.hive._* *val hiveContext = new HiveContext(sc)* *hiveContext.setConf(javax.jdo.option.ConnectionDriveName, com.mysql.jdbc.Driver)* *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionURL, jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)* *hiveContext.setConf(hive.metastore.warehouse.dir, /user/hive/warehouse)* *hiveContext.sql(select * from mysqltable).show()* *Thanks!* *-Terry*
Re: Is it possible to change the default port number 7077 for spark?
SSH by default should be on port 22. 7456 is the port is where master is listening. So any spark app should be able to connect to master using that port. On 11 Jul 2015 13:50, ashishdutt ashish.du...@gmail.com wrote: Hello all, In my lab a colleague installed and configured spark 1.3.0 on a 4 noded cluster on CDH5.4 environment. The default port number for our spark configuration is 7456. I have been trying to SSH to spark-master from using this port number but it fails every time giving error JVM is timed out. After reading the documentation , given by Cloudera http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/cdh_ig_ports_cdh5.html it says that the default port number for spark configuration should be 7077 and that is what i see in all the posts here and elsewhere on search results in Google. So now I have three question please if anyone can help me answer all or any of them Q1) Will the spark configuration work only with port number 7077? If yes, then how can I change the port number? Q2) Do i need to install spark on all the machines in the cluster? Q3) Do run any spark job do I always have to SSH into the spark-master machine ? or is it possible to connect my laptop to the spark-master and invoke commands from my laptop to spark-master and worker machines? Thank you for your time. Ashish -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-change-the-default-port-number-7077-for-spark-tp23774.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark : Custom function for reduceByKey - missing arguments for method
Did you try it by adding the `_` after the method names to partially apply them? Scala is saying that its trying to immediately apply those methods but can't find arguments. But you instead are trying to pass them along as functions (which they aren't). Here is a link to a stackoverflow answer that should help clarify: http://stackoverflow.com/a/19720808/72401. I think there are two solutions, turn the getMax and getMin into functions by using val ala: val getMax: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } val getMin: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b) = if (a b) a else b } or just partially apply them: maxVector = attribMap.reduceByKey( getMax _) minVector = attribMap.reduceByKey( getMin _) On Thu, Jul 9, 2015 at 9:09 PM, ameyamm ameya.malond...@outlook.com wrote: I am trying to normalize a dataset (convert values for all attributes in the vector to 0-1 range). I created an RDD of tuple (attrib-name, attrib-value) for all the records in the dataset as follows: val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( contact = { List( (dage,contact.dage match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry1,contact.dancstry1 match { case Some(value) = DoubleDimension(value) ; case None = null }), (dancstry2,contact.dancstry2 match { case Some(value) = DoubleDimension(value) ; case None = null }), (ddepart,contact.ddepart match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhispanic,contact.dhispanic match { case Some(value) = DoubleDimension(value) ; case None = null }), (dhour89,contact.dhour89 match { case Some(value) = DoubleDimension(value) ; case None = null }) ) } ) Here, contactDataset is of the type RDD[Contact]. The fields of Contact class are of type Option[Long]. DoubleDimension is a simple wrapper over Double datatype. It extends the Ordered trait and implements corresponding compare method and equals method. To obtain the max and min attribute vector for computing the normalized values, maxVector = attribMap.reduceByKey( getMax ) minVector = attribMap.reduceByKey( getMin ) Implementation of getMax and getMin is as follows: def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = { if (a b) a else b } def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = { if (a b) a else b } I get a compile error at calls to the methods getMax and getMin stating: [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error: missing arguments for method getMax in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] maxVector = attribMap.reduceByKey( getMax ) [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error: missing arguments for method getMin in class DatasetReader; [ERROR] follow this method with '_' if you want to treat it as a partially applied function [ERROR] minVector = attribMap.reduceByKey( getMin ) I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as per my knowledge, I can pass any method to it as long as the functions is of the type f : (V, V) = V. I am really stuck here. Please help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: spark ec2 as non-root / any plan to improve that in the future ?
Quick and clear answer thank you. 2015-07-09 21:07 GMT+02:00 Nicholas Chammas nicholas.cham...@gmail.com: No plans to change that at the moment, but agreed it is against accepted convention. It would be a lot of work to change the tool, change the AMIs, and test everything. My suggestion is not to hold your breath for such a change. spark-ec2, as far as I understand, is not intended for spinning up permanent or production infrastructure (though people may use it for those purposes), so there isn't a big impetus to fix this kind of issue. It works really well for what it was intended for: spinning up clusters for testing, prototyping, and experimenting. Nick On Thu, Jul 9, 2015 at 3:25 AM matd matd...@gmail.com wrote: Hi, Spark ec2 scripts are useful, but they install everything as root. AFAIK, it's not a good practice ;-) Why is it so ? Should these scripts reserved for test/demo purposes, and not to be used for a production system ? Is it planned in some roadmap to improve that, or to replace ec2-scripts with something else ? Would it be difficult to change them to use a sudo-er instead ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-as-non-root-any-plan-to-improve-that-in-the-future-tp23734.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What is a best practice for passing environment variables to Spark workers?
Thanks, Akhil. We're trying the conf.setExecutorEnv() approach since we've already got environment variables set. For system properties we'd go the conf.set(spark.) route. We were concerned that doing the below type of thing did not work, which this blog post seems to confirm ( http://progexc.blogspot.com/2014/12/spark-configuration-mess-solved.html): $SPARK_HOME/spark-submit \ --class com.acme.Driver \ --conf spark.executorEnv.VAR1=VAL1 \ --conf spark.executorEnv.VAR2=VAL2 \ . The code running on the workers does not see these variables. On Fri, Jul 10, 2015 at 4:03 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It basically filters out everything which doesn't starts with spark https://github.com/apache/spark/blob/658814c898bec04c31a8e57f8da0103497aac6ec/core/src/main/scala/org/apache/spark/SparkConf.scala#L314. so it is necessary to keep spark. in the property name. Thanks Best Regards On Fri, Jul 10, 2015 at 12:06 AM, dgoldenberg dgoldenberg...@gmail.com wrote: I have about 20 environment variables to pass to my Spark workers. Even though they're in the init scripts on the Linux box, the workers don't see these variables. Does Spark do something to shield itself from what may be defined in the environment? I see multiple pieces of info on how to pass the env vars into workers and they seem dated and/or unclear. Here: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html SparkConf conf = new SparkConf(); conf.set(spark.myapp.myproperty, propertyValue); OR set them in spark-defaults.conf, as in spark.config.one value spark.config.two value2 In another posting, http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html : conf.setExecutorEnv(ORACLE_HOME, myOraHome) conf.setExecutorEnv(SPARK_JAVA_OPTS, -Djava.library.path=/my/custom/path) The configuration guide talks about spark.executorEnv.[EnvironmentVariableName] -- Add the environment variable specified by EnvironmentVariableName to the Executor process. The user can specify multiple of these to set multiple environment variables. Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated (?) What is the easiest/cleanest approach here? Ideally, I'd not want to burden my driver program with explicit knowledge of all the env vars that are needed on the worker side. I'd also like to avoid having to jam them into spark-defaults.conf since they're already set in the system init scripts, so why duplicate. I suppose one approach would be to namespace all my vars to start with a well-known prefix, then cycle through the env in the driver and stuff all these variables into the Spark context. If I'm doing that, would I want to conf.set(spark.myapp.myproperty, propertyValue); and is spark. necessary? or was that just part of the example? or would I want to conf.setExecutorEnv(MYPREFIX_MY_VAR_1, some-value); Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Broadcasting large dataset
Hey, Guys! I am using spark for NGS data application. In my case I have to broadcast a very big dataset to each task. However there are serveral tasks (say 48 tasks) running on cpus (also 48 cores) in the same node. These tasks, who run on the same node, could share the same dataset. But spark broadcast them 48 times (if I understand correctly). Is there a way to broadcast just one copy for each node and share by all tasks running on such nodes? Much appreciated! best! huanglr
Debug Spark Streaming in PyCharm
Hello, I'm trying to debug a PySpark app with Kafka Streaming in PyCharm. However, PySpark cannot find the jar dependencies for Kafka Streaming without editing the program. I can temporarily use SparkConf to set 'spark.jars', but I'm using Mesos for production and don't want to edit my program everytime I want to debug. I'd like to find a way to debug without editing the source. Here's what my PyCharm debug execution command looks like: home/brandon/.pyenv/versions/coinspark/bin/python2.7 /opt/pycharm-community/helpers/pydev/pydevd.py --multiproc --client 127.0.0.1 --port 59042 --file /home/brandon/src/coins/coinspark/streaming.py I might be able to use spark-submit has the command PyCharm runs, but I'm not sure if that will work with the debugger. Thoughts? Cheers! Brandon Bradley -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Debug-Spark-Streaming-in-PyCharm-tp23766.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issues when combining Spark and a third party java library
I'm using hadoop 2.5.2 with spark 1.4.0 and I can also see in my logs: 15/07/09 06:39:02 DEBUG HadoopRDD: SplitLocationInfo and other new Hadoop classes are unavailable. Using the older Hadoop location info code. java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.rdd.HadoopRDD$SplitInfoReflections.init(HadoopRDD.scala:386) at org.apache.spark.rdd.HadoopRDD$.liftedTree1$1(HadoopRDD.scala:396) at org.apache.spark.rdd.HadoopRDD$.init(HadoopRDD.scala:395) at org.apache.spark.rdd.HadoopRDD$.clinit(HadoopRDD.scala) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:165) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$reduceByKey$3.apply(PairRDDFunctions.scala:290) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.PairRDDFunctions.reduceByKey(PairRDDFunctions.scala:289) at WordCount$.main(WordCount.scala:13) at WordCount.main(WordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) The application I launch through spark-submit can access data on hdfs tho, and I launch the script with HADOOP_HOME being set. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p23765.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: SparkSQL Postgres balanced partition of DataFrames
Hi, I have a very simple setup of SparkSQL connecting to a Postgres DB and I'm trying to get a DataFrame from a table, the Dataframe with a number X of partitions (lets say 2). The code would be the following: MapString, String options = new HashMapString, String(); options.put(url, DB_URL); options.put(driver, POSTGRES_DRIVER); options.put(dbtable, select ID, OTHER from TABLE limit 1000); options.put(partitionColumn, ID); options.put(lowerBound, 100); options.put(upperBound, 500); options.put(numPartitions,2); DataFrame housingDataFrame = sqlContext.read().format(jdbc).options(options).load(); For some reason, one partition of the DataFrame contains almost all rows. For what I can understand lowerBound/upperBound are the parameters used to finetune this. In SparkSQL's documentation (Spark 1.4.0 - spark-sql_2.11) it says they are used to define the stride, not to filter/range the partition column. But that raises several questions: 1. The stride is the frequency (number of elements returned each query) with which Spark will query the DB for each executor (partition)? 2. If not, what is the purpose of this parameters, what do they depend on and how can I balance my DataFrame partitions in a stable way (not asking all partitions contain the same number of elements, just that there is an equilibrium - for example 2 partitions 100 elements 55/45 , 60/40 or even 65/35 would do) Can't seem to find a clear answer to these questions around and was wondering if maybe some of you could clear this points for me, because right now is affecting my cluster performance when processing X million rows and all the heavy lifting goes to one single executor. Thank you for your time, Moises Baly
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
It looks like there is no problem with Tomcat 8. On Fri, Jul 10, 2015 at 11:12 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO]
SparkHub: a new community site for Apache Spark
Hi All, Today, I'm happy to announce SparkHub (http://sparkhub.databricks.com), a service for the Apache Spark community to easily find the most relevant Spark resources on the web. SparkHub is a curated list of Spark news, videos and talks, package releases, upcoming events around the world, and a Spark Meetup directory to help you find a meetup close to you. We will continue to expand the site in the coming months and add more content. I hope SparkHub can help you find Spark related information faster and more easily than is currently possible. Everything is sourced from the Spark community, and we welcome input from you as well! - Patrick - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106 Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics --- [INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT [INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided [INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile [INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile [INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile [INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile [INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile [INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile [INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile [INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system [INFO] | +- com.google.guava:guava:jar:11.0.2:compile [INFO] | +- commons-cli:commons-cli:jar:1.2:compile [INFO] | +- xmlenc:xmlenc:jar:0.52:compile [INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:compile [INFO] | +- commons-codec:commons-codec:jar:1.4:compile [INFO] | +- commons-io:commons-io:jar:2.4:compile [INFO] | +- commons-net:commons-net:jar:3.1:compile
Spark Streaming - Inserting into Tables
Why does this not work? Is insert into broken in 1.3.1? It does not throw any errors, fail, or throw exceptions. It simply does not work. val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore.saveAsParquetFile(/tmp/cache/dayBefore.parquet) val parquetFile = sqlContext.parquetFile(/tmp/cache/dayBefore.parquet) parquetFile.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(s3://textFileDirectory) val day = sqlContext.jsonFile(s3://textFileDirectory) day.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.registerTempTable(tmp_rideaccepted) sqlContext.sql(insert into table rideaccepted select * from tmp_rideaccepted) } ssc.start() or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore..registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start()
Unit tests of spark application
Hi, I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. Thank you very much. -Naveen
Re: Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Hi Ted, I'm running Tomcat 7 with Java: java version 1.8.0_45 Java(TM) SE Runtime Environment (build 1.8.0_45-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode) Zoran On Fri, Jul 10, 2015 at 10:45 AM, Ted Yu yuzhih...@gmail.com wrote: What version of Java is Tomcat run ? Thanks On Jul 10, 2015, at 10:09 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO] | +- com.google.guava:guava:jar:11.0.2:compile[INFO] | +- commons-cli:commons-cli:jar:1.2:compile[INFO] | +-
Re: Unit tests of spark application
Unless you had something specific in mind, it should be as simple as creating a SparkContext object using a master of local[2] in your tests On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi, I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. Thank you very much. -Naveen -- -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Pyspark not working on yarn-cluster mode
Yes, you can launch (from Java code) pyspark scripts with yarn-cluster mode without using the spark-submit script. Check SparkLauncher code in this link https://github.com/apache/spark/tree/master/launcher/src/main/java/org/apache/spark/launcher . SparkLauncher is not dependent on Spark core jars, so it is very easy to integrate it into your project. Code example for launching Spark job without spark-submit script: Process spark = new SparkLauncher().setSparkHome(path_to_spark) .setAppName(pythonScriptName).setMaster(yarn-cluster) .setAppResource(sparkScriptPath.toString()).addAppArgs(params) .addPyFile(otherPythonScriptPath.toString()).launch(); But in order to correctly handling python path addition of 3rd party packages, which Marcelo has implemented in patch Spark 5479 https://issues.apache.org/jira/browse/SPARK-5479, download latest source code of Spark, and built it yourself with maven. Other pre-built Spark versions does not include that patch. On Fri, Jul 10, 2015 at 9:52 AM, Sandy Ryza sandy.r...@cloudera.com wrote: To add to this, conceptually, it makes no sense to launch something in yarn-cluster mode by creating a SparkContext on the client - the whole point of yarn-cluster mode is that the SparkContext runs on the cluster, not on the client. On Thu, Jul 9, 2015 at 2:35 PM, Marcelo Vanzin van...@cloudera.com wrote: You cannot run Spark in cluster mode by instantiating a SparkContext like that. You have to launch it with the spark-submit command line script. On Thu, Jul 9, 2015 at 2:23 PM, jegordon jgordo...@gmail.com wrote: Hi to all, Is there any way to run pyspark scripts with yarn-cluster mode without using the spark-submit script? I need it in this way because i will integrate this code into a django web app. When i try to run any script in yarn-cluster mode i got the following error : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. I'm creating the sparkContext in the following way : conf = (SparkConf() .setMaster(yarn-cluster) .setAppName(DataFrameTest)) sc = SparkContext(conf = conf) #Dataframe code Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo -- Best regards, Elkhan Dadashov
Re: Starting Spark-Application without explicit submission to cluster?
Hi Jan, Most SparkContext constructors are there for legacy reasons. The point of going through spark-submit is to set up all the classpaths, system properties, and resolve URIs properly *with respect to the deployment mode*. For instance, jars are distributed differently between YARN cluster mode and standalone client mode, and this is not something the Spark user should have to worry about. As an example, if you pass jars through the SparkContext constructor, it won't actually work in cluster mode if the jars are local. This is because the driver is launched on the cluster and the SparkContext will try to find the jars on the cluster in vain. So the more concise answer to your question is: yes technically you don't need to go through spark-submit, but you'll have to deal with all the bootstrapping complexity yourself. -Andrew 2015-07-10 3:37 GMT-07:00 algermissen1971 algermissen1...@icloud.com: Hi, I am a bit confused about the steps I need to take to start a Spark application on a cluster. So far I had this impression from the documentation that I need to explicitly submit the application using for example spark-submit. However, from the SparkContext constructur signature I get the impression that maybe I do not have to do that after all: In http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.SparkContext the first constructor has (among other things) a parameter 'jars' which indicates the Collection of JARs to send to the cluster. To me this suggests that I can simply start the application anywhere and that it will deploy itself to the cluster in the same way a call to spark-submit would. Is that correct? If not, can someone explain why I can / need to provide master and jars etc. in the call to SparkContext because they essentially only duplicate what I would specify in the call to spark-submit. Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on Tomcat has exception IncompatibleClassChangeError: Implementing class
Hi, I've developed maven application that uses mongo-hadoop connector to pull data from mongodb and process it using Apache spark. The whole process runs smoothly if I run it on embedded Jetty server. However, if I deploy it to Tomcat server 7, it's always interrupted at the line of code which collects data from JavaPairRDD with exception that doesn't give me any clue what the problem might be: 15/07/09 20:42:05 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:58302 (size: 6.3 KB, free: 946.6 MB) 15/07/09 20:42:05 INFO spark.SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkCategoryRecommender.java:106Exception in thread Thread-6 java.lang.IncompatibleClassChangeError: Implementing class at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:760) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at org.apache.catalina.loader.WebappClassLoader.findClassInternal(WebappClassLoader.java:2918) at org.apache.catalina.loader.WebappClassLoader.findClass(WebappClassLoader.java:1174) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1669) at org.apache.catalina.loader.WebappClassLoader.loadClass(WebappClassLoader.java:1547) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.firstAvailableClass(SparkHadoopMapReduceUtil.scala:74) at org.apache.spark.mapreduce.SparkHadoopMapReduceUtil$class.newJobContext(SparkHadoopMapReduceUtil.scala:28) at org.apache.spark.rdd.NewHadoopRDD.newJobContext(NewHadoopRDD.scala:66) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD.collect(RDD.scala:884) at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:335) at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.buildDataModelOnProductid(SparkCategoryRecommender.java:149) at com.warrantylife.iqmetrix.spark.SparkCategoryRecommender.computeAndStoreRecommendationsForProductsInCategory(SparkCategoryRecommender.java:179) at com.warrantylife.recommendation.impl.CategoryRecommendationManagerImpl.computeRecommendationsForCategory(CategoryRecommendationManagerImpl.java:105) at com.warrantylife.testdata.TestDataGeneratorService$1.run(TestDataGeneratorService.java:55) at java.lang.Thread.run(Thread.java:745) I guess there is some library conflict happening on the Tomcat, but I have no idea where to look for problem. This is my whole dependency tree: [INFO] --- maven-dependency-plugin:2.1:tree (default-cli) @ warranty-analytics ---[INFO] org.warrantylife:warranty-analytics:war:0.1.0-SNAPSHOT[INFO] +- javax.servlet:javax.servlet-api:jar:3.0.1:provided[INFO] +- com.sun.jersey:jersey-client:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-server:jar:1.19:compile[INFO] +- com.sun.jersey:jersey-core:jar:1.19:compile[INFO] | \- javax.ws.rs:jsr311-api:jar:1.1.1:compile[INFO] +- com.sun.jersey:jersey-servlet:jar:1.19:compile[INFO] +- org.apache.hadoop:hadoop-common:jar:2.4.1:compile[INFO] | +- org.apache.hadoop:hadoop-annotations:jar:2.4.1:compile[INFO] | | \- jdk.tools:jdk.tools:jar:1.6:system[INFO] | +- com.google.guava:guava:jar:11.0.2:compile[INFO] | +- commons-cli:commons-cli:jar:1.2:compile[INFO] | +- xmlenc:xmlenc:jar:0.52:compile[INFO] | +- commons-httpclient:commons-httpclient:jar:3.1:compile[INFO] | +- commons-codec:commons-codec:jar:1.4:compile[INFO] | +- commons-io:commons-io:jar:2.4:compile[INFO] | +- commons-net:commons-net:jar:3.1:compile[INFO] | +- commons-collections:commons-collections:jar:3.2.1:compile[INFO] | +- javax.servlet:servlet-api:jar:2.5:compile[INFO] | +- org.mortbay.jetty:jetty:jar:6.1.26:compile[INFO] | +- org.mortbay.jetty:jetty-util:jar:6.1.26:compile[INFO] |
Re: PySpark without PySpark
Hi Ashish, Cool. glad it worked out. I have only used Spark clusters on EC2, which I spin up using the spark-ec2 scripts (part of the Spark downloads). So don't have any experience setting up inhouse clusters like you want to do. But I found some documentation here that may be helpful. https://docs.sigmoidanalytics.com/index.php/Installing_Spark_and_Setting_Up_Your_Cluster#Deploying_set_of_machines_over_SSH There are other options as well in this document that will require you to know some other tools like Chef (previous sections). Good luck, Sujit On Thu, Jul 9, 2015 at 10:25 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi Sujit, Thank you for your time to help me out. And special thank you for your elaborate steps. I corrected SPARK_HOME to be c:\spark-1.3.0 2) I installed py4j from anaconda command prompt and the command you gave executed successfully. 3) I replaced python27 as python in the 00-setup script. I now give the Path variables as defined and the PATH. SPARK_HOMEC:\Spark-1.3.0 JAVA_HOME C:\Program Files\Java\jdk1.7.0_79 PYTHONPATH C:\Users\Ashish Dutt\Anaconda MAVEN_HOMEC:\Maven\bin SBT_HOME C:\SBT PATH %JAVA_HOME%\BIN; %PYTHON_PATH%; %HADOOP_HOME%\BIN; %SPARK_HOME%; %M2_HOME%\BIN %MAVEN_HOME%\BIN;%SBT_HOME%; 4) This time, I grabbed my baseball bat (you do know why..) invoked ipython notebook again and with the other free hand I slowly typed the command print SPARK_HOME -- it worked Then another command from pyspark import SparkContext and it worked too!!! The baseball bat dropped to the ground and I quickly jabbed the other commands given in the post. Attached is the screenshot and it all worked... EUREKA... Sujit, a quintal of thanks for your persistence in helping me resolve this problem. You have been very helpful and I wish you luck and success in all your endeavors. Next milestone is to get this to work in a cluster environment. I am confused that do I need to install spark-1.3.0 on all the 4 linux machines that make my cluster? The goal is to use my laptop as a client (from where I will submit spark commands to the master server) The master can then distribute the job to the three nodes and provide the client with the end result. Am i correct in this visualization ? Once again, thank you for your efforts. Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Fri, Jul 10, 2015 at 11:48 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Ashish, Julian's approach is probably better, but few observations: 1) Your SPARK_HOME should be C:\spark-1.3.0 (not C:\spark-1.3.0\bin). 2) If you have anaconda python installed (I saw that you had set this up in a separate thread, py4j should be part of the package - at least I think so. To test this, try in your python repl: from py4j.java_gateway import JavaGateway if it succeeds you already have it. 3) In case Py4J is not installed, the best way to install a new package is using easy_install or pip. Make sure your path is set up so when you call python you are calling the anaconda version (in case you have multiple python versions), then if so, do easy_install py4j - this will install py4j correctly without any messing around on your part. Install instructions for py4j available on their site: http://py4j.sourceforge.net/install.html 4) You should replace the python2 in your 00-setup-script with python, so you point to the $SPARK_HOME/python directory (C:\spark-1.3.0\python). -sujit On Thu, Jul 9, 2015 at 8:26 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Sujit, Many thanks for your response. To answer your questions; Q1) Do you have SPARK_HOME set up in your environment?- Yes, I do. It is SPARK_HOME=C:/spark-1.3.0/bin Q2) Is there a python2 or python subdirectory under the root of your Spark installation? - Yes, i do have that too. It is called python. To fix this problem this is what I did, I downloaded py4j-0.8.2.1-src from here https://pypi.python.org/pypi/py4j which was not there initially when I downloaded the spark package from the official repository. I then put it in the lib directory as C:\spark-1.3.0\python\lib. Note I did not extract the zip file. I put it in as it is. The pyspark folder of the spark-1.3.0 root folder. What I next did was copy this file and put it in the pythonpath. So my python path now reads as PYTHONPATH=C:/Python27/ I then rebooted the computer and a silent prayer :-) Then I opened the command prompt and invoked the command pyspark from the bin directory of spark and EUREKA, it worked :-) Attached is the screenshot for the same. Now, the problem is with IPython notebook. I cannot get it to work with pySpark. I have a cluster with 4 nodes using CDH5.4 I was able to resolve the problem. Now the next challenge was to configure it with IPython. Followed the steps as documented
SparkDriverExecutionException when using actorStream
Hi, I'm trying to create a Spark Streaming actor stream but I'm having several problems. First of all the guide from https://spark.apache.org/docs/latest/streaming-custom-receivers.html refers to the code https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala, which uses AkkaUtils and org.apache.spark.SecurityManager which are now private[spark]. So I've tried with the example from http://www.typesafe.com/activator/template/spark-streaming-scala-akka, but I get the following exception as soon as I store some data in Spark Streaming org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1025) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Caused by: java.lang.ArrayStoreException: [Ljava.lang.Object; at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1750) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1750) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1021) ... 3 more My code is basically the same as in that example, and it is available at https://gist.github.com/juanrh/139af20fd2060cb1a9d1 . If I comment receiverActor ! msg then there is no exception, but also no data is received in the stream. Any thoughts on this? Thanks a lot for you help. Greetings, Juan
Re: spark-submit
Hi Ashutosh, I believe the class is org.apache.spark.*examples.*graphx.Analytics? If you're running page rank on live journal you could just use org.apache.spark.examples.graphx.LiveJournalPageRank. -Andrew 2015-07-10 3:42 GMT-07:00 AshutoshRaghuvanshi ashutosh.raghuvans...@gmail.com: when I do run this command: ashutosh@pas-lab-server7:~/spark-1.4.0$ ./bin/spark-submit \ --class org.apache.spark.graphx.lib.Analytics \ --master spark://172.17.27.12:7077 \ assembly/target/scala-2.10/spark-assembly-1.4.0-hadoop2.2.0.jar \ pagerank soc-LiveJournal1.txt --numEPart=100 --nverts=4847571 --numIter=10 --partStrategy=EdgePartition2D I get an error: java.lang.ClassNotFoundException: org.apache.spark.graphx.lib.Analytics at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/10 15:31:35 INFO Utils: Shutdown hook called where is this class, what path should I give? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-tp23761.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkR Error in sparkR.init(master=“local”) in RStudio
I have installed the SparkR package from Spark distribution into the R library. I can call the following command and it seems to work properly: library(SparkR) However, when I try to get the Spark context using the following code, sc - sparkR.init(master=local) It fails after some time with the following message: Error in sparkR.init(master = local) : JVM is not ready after 10 seconds I have set JAVA_HOME, and I have a working RStudio where I can access other packages like ggplot2. I don't know why it is not working, and I don't even know where to investigate the issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
dataFrame.colaesce(1) or dataFrame.reapartition(1) does not seem work for me
Hi I have Hive insert into query which creates new Hive partitions. I have two Hive partitions named server and date. Now I execute insert into queries using the following code and try to save it DataFrame dframe = hiveContext.sql(insert into summary1 partition(server='a1',date='2015-05-22') select from sourcetbl bla bla) //above query creates orc file at /user/db/a1/20-05-22 // I want only one part-0 file at the end of above query so I tried the following and none worked drame.coalesce(1).write().format(orc).mode(SaveMode.OverWrite).saveAsTable(summary1); drame.repartition(1).write().format(orc).mode(SaveMode.OverWrite).saveAsTable(summary1); drame.coalesce(1).write().format(orc).save(/user/db/a1/20-05-22,SaveMode.OverWrite); drame.repartition(1).write().format(orc).save(/user/db/a1/20-05-22,SaveMode.OverWrite); No matter I use coalesce or reparition above query creates around 200 files at the location /user/db/a1/20-05-22. I was thinking if I call coalesce(1) then it will create final one part file. Am I wrong? Please guide. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dataFrame-colaesce-1-or-dataFrame-reapartition-1-does-not-seem-work-for-me-tp23769.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pyspark not working on yarn-cluster mode
To add to this, conceptually, it makes no sense to launch something in yarn-cluster mode by creating a SparkContext on the client - the whole point of yarn-cluster mode is that the SparkContext runs on the cluster, not on the client. On Thu, Jul 9, 2015 at 2:35 PM, Marcelo Vanzin van...@cloudera.com wrote: You cannot run Spark in cluster mode by instantiating a SparkContext like that. You have to launch it with the spark-submit command line script. On Thu, Jul 9, 2015 at 2:23 PM, jegordon jgordo...@gmail.com wrote: Hi to all, Is there any way to run pyspark scripts with yarn-cluster mode without using the spark-submit script? I need it in this way because i will integrate this code into a django web app. When i try to run any script in yarn-cluster mode i got the following error : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. I'm creating the sparkContext in the following way : conf = (SparkConf() .setMaster(yarn-cluster) .setAppName(DataFrameTest)) sc = SparkContext(conf = conf) #Dataframe code Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pyspark-not-working-on-yarn-cluster-mode-tp23755.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: Unit tests of spark application
On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. https://spark.apache.org/docs/latest/programming-guide.html#unit-testing Typically I create test data using SparkContext.parallelize and then call RDD.collect to get the results to assert.
Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?
Metastore configuration should be set in hive-site.xml. On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote: Hi, I am trying to set the hive metadata destination to a mysql database in hive context, it works fine in spark 1.3.1, but it seems broken in spark 1.4.1-rc1, where it always connect to the default metadata: local), is this a regression or we must set the connection in hive-site.xml? The code is very simple in spark shell: * import org.apache.spark.sql.hive._* *val hiveContext = new HiveContext(sc)* *hiveContext.setConf(javax.jdo.option.ConnectionDriveName, com.mysql.jdbc.Driver)* *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)* *hiveContext.setConf(javax.jdo.option.ConnectionURL, jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)* *hiveContext.setConf(hive.metastore.warehouse.dir, /user/hive/warehouse)* *hiveContext.sql(select * from mysqltable).show()* *Thanks!* *-Terry*
Re: Unit tests of spark application
Somewhat biased of course, but you can also use spark-testing-base from spark-packages.org as a basis for your unittests. On Fri, Jul 10, 2015 at 12:03 PM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. https://spark.apache.org/docs/latest/programming-guide.html#unit-testing Typically I create test data using SparkContext.parallelize and then call RDD.collect to get the results to assert. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
RE: Spark Broadcasting large dataset
When you say tasks, do you mean different applications, or different tasks in the same application? If it's the same program, they should be able to share the broadcasted value. But given you're asking the question, I imagine they're separate. And in that case, afaik, the answer is no. You might look into putting the data into a fast store like Cassandra - that might help depending on your use case. Cheers, Ashic. Date: Fri, 10 Jul 2015 15:52:42 +0200 From: huan...@cebitec.uni-bielefeld.de To: user@spark.apache.org Subject: Spark Broadcasting large dataset Hey, Guys! I am using spark for NGS data application. In my case I have to broadcast a very big dataset to each task. However there are serveral tasks (say 48 tasks) running on cpus (also 48 cores) in the same node. These tasks, who run on the same node, could share the same dataset. But spark broadcast them 48 times (if I understand correctly). Is there a way to broadcast just one copy for each node and share by all tasks running on such nodes? Much appreciated! best! huanglr
Re: RE: Spark Broadcasting large dataset
Hi, Ashic, Thank you very much for your reply! The tasks I mention is a running Function that I implemented with Spark API and passed to each partition of a RDD. Within the Function I broadcast a big variable to be queried by each partition. So, When I am running on a 48 cores slave node. I have 48 partitions corresponding 48 tasks (or clousure) where each tasks get a broadcast value (I see this from the memory usage and the API doc). Is there a way to share the value with all 48 partitions of 48 tasks? best! huanglr From: Ashic Mahtab Date: 2015-07-10 17:02 To: huanglr; Apache Spark Subject: RE: Spark Broadcasting large dataset When you say tasks, do you mean different applications, or different tasks in the same application? If it's the same program, they should be able to share the broadcasted value. But given you're asking the question, I imagine they're separate. And in that case, afaik, the answer is no. You might look into putting the data into a fast store like Cassandra - that might help depending on your use case. Cheers, Ashic. Date: Fri, 10 Jul 2015 15:52:42 +0200 From: huan...@cebitec.uni-bielefeld.de To: user@spark.apache.org Subject: Spark Broadcasting large dataset Hey, Guys! I am using spark for NGS data application. In my case I have to broadcast a very big dataset to each task. However there are serveral tasks (say 48 tasks) running on cpus (also 48 cores) in the same node. These tasks, who run on the same node, could share the same dataset. But spark broadcast them 48 times (if I understand correctly). Is there a way to broadcast just one copy for each node and share by all tasks running on such nodes? Much appreciated! best! huanglr
Re: Issues when combining Spark and a third party java library
Also, it's worth noting that I'm using the prebuilt version for hadoop 2.4 and higher from the official website. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issues-when-combining-Spark-and-a-third-party-java-library-tp21367p23770.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unit tests of spark application
I can +1 Holden's spark-testing-base package. Burak On Fri, Jul 10, 2015 at 12:23 PM, Holden Karau hol...@pigscanfly.ca wrote: Somewhat biased of course, but you can also use spark-testing-base from spark-packages.org as a basis for your unittests. On Fri, Jul 10, 2015 at 12:03 PM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu wrote: I want to write junit test cases in scala for testing spark application. Is there any guide or link which I can refer. https://spark.apache.org/docs/latest/programming-guide.html#unit-testing Typically I create test data using SparkContext.parallelize and then call RDD.collect to get the results to assert. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Spark Streaming and using Swift object store for checkpointing
Hi, initially today when moving my streaming application to the cluster the first time I ran in to newbie error of using a local file system for checkpointing and the RDD partition count differences (see exception below). Having neither HDFS nor S3 (and the Cassandra-Connector not yet supporting checkpointing[1]) I turned to Swift (which is already available in our architecture). I mounted Swift using cloudfuse[2] I see the checkpoint files on all three cluster nodes - but still the job fails with the mentioned exception. I experimented with cloudfuse caching settings but that does not *seem* to help. Can anyone shed some light on this issue and provide a hint what I might be doing wrong here? Jan [1] https://datastax-oss.atlassian.net/browse/SPARKC-13 [2] https://github.com/redbo/cloudfuse Exception: org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[72] at print at App.scala:47(0) has different number of partitions than original RDD MapPartitionsRDD[70] at updateStateByKey at App.scala:47(2) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:103) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1538) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1535) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1535) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1534) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1735) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1750) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1765) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1272) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apac - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder
Hi, My spark job runs without error, but once it completes I get this message and the app is logged as incomplete application in my spark-history : SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. To fix the issue, I downloaded slf4j-simple-1.7.12.jar and included it in class path. But when I do that I get Multiple bindings were found on the class path, the class paths point to: spark-assembly-1.3.1-hadoop2.6.0.jar and slf4j-simple-1.7.12.jar file. Any ideas? Thanks,
JAR containing org.apache.hadoop.mapreduce.lib.input.FileInputFormat
Sorry, only indirectly Spark-related. I've attempting to create a .NET proxy for spark-core, using JNI4NET. At the moment I'm stuck with the following error when running the proxy generator: java.lang.NoClassDefFoundError: org.apache.hadoop.mapreduce.lib.input.FileInputFormat I've resolved similar issues by finding the appropriate JAR and adding it to the classpath. But so far I haven't been able to figure out where this one comes from, does anyone know what JAR this is from? It's not hadoop-common, hadoop-mapreduce-client-core, hadoop-mapreduce-client-common, hadoop-mapreduce-client-app, or hadoop-mapreduce-client-jobclient. Thanks, -Lincoln
Re: SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder
No. Works perfectly. On Fri, Jul 10, 2015 at 3:38 PM, liangdianpeng liangdianp...@vip.163.com wrote: if the class inside the spark_XXX.jar was damaged 发自网易邮箱手机版 On 2015-07-11 06:13 , Mulugeta Mammo mulugeta.abe...@gmail.com Wrote: Hi, My spark job runs without error, but once it completes I get this message and the app is logged as incomplete application in my spark-history : SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. To fix the issue, I downloaded slf4j-simple-1.7.12.jar and included it in class path. But when I do that I get Multiple bindings were found on the class path, the class paths point to: spark-assembly-1.3.1-hadoop2.6.0.jar and slf4j-simple-1.7.12.jar file. Any ideas? Thanks,