1.4.1 in production
Hi, do somebody already uses version 1.4.1 in production? any problems? thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-4-1-in-production-tp23909.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: use S3-Compatible Storage with spark
Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to start reading the spark source code?
ok got some headstart: pull the git source to 14719b93ff4ea7c3234a9389621be3c97fa278b9 (first release so that I could at least build it) then build it according to README.md, then get eclipse setup , with scala-ide then create new scala project, set the project directory to be SCALA_SOURCE_HOME/core instead of the default in eclipse remove the test from source, copy all the jars from SCALA_SOURCE_HOME/lib_managed into a separate dir, then in eclipse add all these as external jars. set ur scala project run time to be 2.10.5 (the one coming with spark seems to be 2.10.4 , eclipse default is 2.9 something) there would be 2 compile errors , one due to Tuple() , change it to Tuple2, another one is currentThread, change it to Thread.currentThread() then it will build fine I pasted the hello-world from docs , since the getting started doc is for latest version, I had to make some minor changes: package spark import spark.SparkContext import spark.SparkContext._ object Tryout { def main(args: Array[String]) { val logFile = ../README.md // Should be some file on your system val sc = new SparkContext(local, tryout, ., List(System.getenv(SPARK_EXAMPLES_JAR))) val logData = sc.textFile(logFile, 2).cache() //val logData = scala.io.Source.fromFile(args(0)).getLines().toArray val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } } then I debug through this and it became fairly clear On Sun, Jul 19, 2015 at 10:13 PM, Yang tedd...@gmail.com wrote: thanks, my point is that earlier versions are normally much simpler so it's easier to follow. and the basic structure should at least bare great similarity with latest version On Sun, Jul 19, 2015 at 9:27 PM, Ted Yu yuzhih...@gmail.com wrote: e5c4cd8a5e188592f8786a265 was from 2011. Not sure why you started with such an early commit. Spark project has evolved quite fast. I suggest you clone Spark project from github.com/apache/spark/ and start with core/src/main/scala/org/apache/spark/rdd/RDD.scala Cheers On Sun, Jul 19, 2015 at 7:44 PM, Yang tedd...@gmail.com wrote: I'm trying to understand how spark works under the hood, so I tried to read the source code. as I normally do, I downloaded the git source code, reverted to the very first version ( actually e5c4cd8a5e188592f8786a265c0cd073c69ac886 since the first version even lacked the definition of RDD.scala) but the code looks too simple and I can't find where the magic happens, i.e. a transformation /computation is scheduled on a machine, bytes stored etc. it would be great if someone could show me a path in which the different source files are involved, so that I could read each of them in turn. thanks! yang
Re: how to start reading the spark source code?
also one peculiar difference vs Hadoop MR is that the partition/split/part of RDD is as much an operation as it's data, since an RDD is associated with a transformation, and a lineage of all its ancestor RDDs. so when the partition is transferred to a new executor/worker (potentially on another box), the operation definition / code is transferred together with the data to that new executor, involving serialization and deserialization. this is something new, if hadoop MR does this, it needs to transfer the jar file to worker, but spark being scala, it was easy to transfer the entire operation (Task[] ) through serialization. On Mon, Jul 20, 2015 at 12:38 AM, Yang tedd...@gmail.com wrote: ok got some headstart: pull the git source to 14719b93ff4ea7c3234a9389621be3c97fa278b9 (first release so that I could at least build it) then build it according to README.md, then get eclipse setup , with scala-ide then create new scala project, set the project directory to be SCALA_SOURCE_HOME/core instead of the default in eclipse remove the test from source, copy all the jars from SCALA_SOURCE_HOME/lib_managed into a separate dir, then in eclipse add all these as external jars. set ur scala project run time to be 2.10.5 (the one coming with spark seems to be 2.10.4 , eclipse default is 2.9 something) there would be 2 compile errors , one due to Tuple() , change it to Tuple2, another one is currentThread, change it to Thread.currentThread() then it will build fine I pasted the hello-world from docs , since the getting started doc is for latest version, I had to make some minor changes: package spark import spark.SparkContext import spark.SparkContext._ object Tryout { def main(args: Array[String]) { val logFile = ../README.md // Should be some file on your system val sc = new SparkContext(local, tryout, ., List(System.getenv(SPARK_EXAMPLES_JAR))) val logData = sc.textFile(logFile, 2).cache() //val logData = scala.io.Source.fromFile(args(0)).getLines().toArray val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } } then I debug through this and it became fairly clear On Sun, Jul 19, 2015 at 10:13 PM, Yang tedd...@gmail.com wrote: thanks, my point is that earlier versions are normally much simpler so it's easier to follow. and the basic structure should at least bare great similarity with latest version On Sun, Jul 19, 2015 at 9:27 PM, Ted Yu yuzhih...@gmail.com wrote: e5c4cd8a5e188592f8786a265 was from 2011. Not sure why you started with such an early commit. Spark project has evolved quite fast. I suggest you clone Spark project from github.com/apache/spark/ and start with core/src/main/scala/org/apache/spark/rdd/RDD.scala Cheers On Sun, Jul 19, 2015 at 7:44 PM, Yang tedd...@gmail.com wrote: I'm trying to understand how spark works under the hood, so I tried to read the source code. as I normally do, I downloaded the git source code, reverted to the very first version ( actually e5c4cd8a5e188592f8786a265c0cd073c69ac886 since the first version even lacked the definition of RDD.scala) but the code looks too simple and I can't find where the magic happens, i.e. a transformation /computation is scheduled on a machine, bytes stored etc. it would be great if someone could show me a path in which the different source files are involved, so that I could read each of them in turn. thanks! yang
Hive Query(Top N)
Hi everyone I have tried to to achieve hierarchical based (index mode) top n creation using spark query. it taken more time when i execute following query Select SUM(`adventurepersoncontacts`.`contactid`) AS `adventurepersoncontacts_contactid` , `adventurepersoncontacts`.`fullname` AS `adventurepersoncontacts_fullname` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts JOIN ( SELECT `F_0`.`fullname_0_0` AS `fullname_0_0`, ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC) AS `R_N_0` FROM( SELECT `adventurepersoncontacts`.`fullname` AS `fullname_0_0`, SUM(adventurepersoncontacts.contactid) AS `Measure_0` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts GROUP BY `adventurepersoncontacts`.`fullname` )`F_0` ) `T_0` on ((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` ) AND (`T_0`.`R_N_0` = 5)) GROUP BY `adventurepersoncontacts`.`fullname In mentioned query, I have set row index in every group according to the aggregation type. Row_number calculation according to the aggregation like Row_Number() Over( order by sum( column name) order by DESC) not directly supported in spark . So i have using sub query like SELECT `F_0`.`fullname_0_0` AS `fullname_0_0`, ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC) AS `R_N_0` FROM( SELECT `adventurepersoncontacts`.`fullname` AS `fullname_0_0`, SUM(adventurepersoncontacts.contactid) AS `Measure_0` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts GROUP BY `adventurepersoncontacts`.`fullname` )`F_0` ) `T_0` The execution time was getting slowed when using following way. In case i removed main group by( remove aggregation) method , it getting very fast execution. Refer following query. I mentioned this query only for slow execution when i include main group by Select `adventurepersoncontacts`.`contactid` AS `adventurepersoncontacts_contactid` , `adventurepersoncontacts`.`fullname` AS `adventurepersoncontacts_fullname` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts JOIN ( SELECT `F_0`.`fullname_0_0` AS `fullname_0_0`, ROW_NUMBER() OVER( ORDER BY `F_0`.`Measure_0` DESC) AS `R_N_0` FROM( SELECT `adventurepersoncontacts`.`fullname` AS `fullname_0_0`, SUM(adventurepersoncontacts.contactid) AS `Measure_0` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts GROUP BY `adventurepersoncontacts`.`fullname` )`F_0` ) `T_0` on ((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` ) AND (`T_0`.`R_N_0` = 5)) I have found other way for getting this hierarchical query. I have created a temp table ( The table refers inner sub query(refer previous query) ) CREATE external table IF NOT EXISTS temp_table AS SELECT SUM(adventurepersoncontacts.contactid) as contactid, adventurepersoncontacts.fullname as fullname FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts GROUP BY `adventurepersoncontacts`.`fullname`; SELECT SUM(`adventurepersoncontacts`.`contactid`) AS `adventurepersoncontacts_contactid` ,`adventurepersoncontacts`.`fullname` AS `adventurepersoncontacts_fullname` FROM `default`.`adventurepersoncontacts` AS adventurepersoncontacts JOIN (SELECT `fullname` AS `fullname_0_0`,ROW_NUMBER() over( order by `contactid` desc) AS `R_N_0` FROM default.temp_table) `T_0` on ((`adventurepersoncontacts`.`fullname` = `T_0`.`fullname_0_0` ) AND (`T_0`.`R_N_0` = 2)) GROUP BY `adventurepersoncontacts`.`fullname`; This is other way. But this execution time was delayed when using create table using select statement Please help any other optimized way to achieve my requirement. Regards, Ravi
Re: Exception while triggering spark job from remote jvm
Just make sure there is no firewall/network blocking the requests as its complaining about timeout. Thanks Best Regards On Mon, Jul 20, 2015 at 1:14 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Just to add more information. I have checked the status of this file, not a single block is corrupted. *[hadoop@ip-172-31-24-27 ~]$ hadoop fsck /ankit -files -blocks* *DEPRECATED: Use of this script to execute hdfs command is deprecated.* *Instead use the hdfs command for it.* Connecting to namenode via http://ip-172-31-24-27.us-west-2.compute.internal:9101 FSCK started by hadoop (auth:SIMPLE) from /172.31.24.27 for path /ankit at Sun Jul 19 19:11:37 UTC 2015 /ankit dir /ankit/SPARKSQLPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar 103599417 bytes, 1 block(s): OK 0. BP-511626939-172.31.24.27-1436185102368:blk_1073741964_126634 len=103599417 repl=1 *Status: HEALTHY* * Total size: 103599417 B* * Total dirs: 1* * Total files: 1* Total symlinks: 0 Total blocks (validated): 1 (avg. block size 103599417 B) Minimally replicated blocks: 1 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 1 Average block replication: 1.0 * Corrupt blocks: 0* Missing replicas: 0 (0.0 %) Number of data-nodes: 4 Number of racks: 1 FSCK ended at Sun Jul 19 19:11:37 UTC 2015 in 1 milliseconds On Mon, Jul 20, 2015 at 12:33 AM, ankit tyagi ankittyagi.mn...@gmail.com wrote: Hi, I am using below code to trigger spark job from remote jvm. import org.apache.hadoop.conf.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.deploy.yarn.Client; import org.apache.spark.deploy.yarn.ClientArguments; /** * @version 1.0, 15-Jul-2015 * @author ankit */ public class QueryEngineImpl implements IQueryEngine { SparkSqlEngine sqlEngine; public QueryEngineImpl(SparkSqlEngine sparkSqlEngine) { this.sqlEngine = sparkSqlEngine; } @Override public void executeQuery(String query, String resultLocation, String... parquetFileLocation) { // TODO Auto-generated method stub String[] args = new String[] { // the name of your application --name, RemoteJVM, // memory for driver (optional) --driver-memory, 1000M, // path to your application's JAR file // required in yarn-cluster mode --jar, hdfs:// 52.24.76.10:9000/ankit/SPARKSQLPOC-0.0.1-SNAPSHOT-jar-with-dependencies.jar , // name of your application's main class (required) --class, SparkSqlEngine, // argument 1 to your Spark program (SparkFriendRecommendation) --arg, query, // argument 2 to your Spark program (SparkFriendRecommendation) --arg, resultLocation, // argument 3 to your Spark program (SparkFriendRecommendation) --arg, parquetFileLocation[0], --arg, yarn-cluster }; Configuration conf = new Configuration(); conf.set(yarn.resourcemanager.address, 52.24.76.10:9022); conf.set(HADOOP_HOME, /home/hadoop); System.setProperty(SPARK_YARN_MODE, true); SparkConf sparkConf = new SparkConf(); System.out.println(SPARK CONF + sparkConf.toDebugString()); // create ClientArguments, which will be passed to Client org.apache.spark.deploy.yarn.ClientArguments cArgs = new ClientArguments(args, sparkConf); //create a insntance of yarn client Client client = new Client(cArgs, conf, sparkConf); client.run(); } public static void main(String[] args) { QueryEngineImpl impl = new QueryEngineImpl(null); impl.executeQuery(select count(*) from parquetTable, /tmp/ankit.txt, s3n://AKIAJPLOFN3DM27DIIUQ:zKsFTopwgmu4zNdAfZ5Xe+Qe0XtbegHLTgy629VB@hadoop-poc-ashish /parquet); } } But I am getting below exception. *23:08:09.268 [main] WARN org.apache.hadoop.hdfs.DFSClient - Failed to connect to /172.31.24.27:9200 http://172.31.24.27:9200 for block, add to deadNodes and continue. org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/172.31.24.27:9200 http://172.31.24.27:9200]* org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/ 172.31.24.27:9200] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) ~[hadoop-common-2.2.0.jar:na] at
Re: How to restart Twitter spark stream
Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve this issue? Thanks, Zoran -- *** Zoran Jeremic, PhD Senior System Analyst Programmer Athabasca University Tel: +1 604 92 89 944 E-mail: zoran.jere...@gmail.com zoran.jere...@va.mod.gov.rs Homepage: http://zoranjeremic.org **
Re: Kmeans Labeled Point RDD
Has there been any progress on this, I am in the same boat. I posted a similar question to Stack Exchange. http://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989p23907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG
Re: Using Dataframe write with newHdoopApi
Update: I have managed to use df.rdd to complete es integration but I preferred df.write. is it possible or upcoming? On 18 Jul 2015 23:19, ayan guha guha.a...@gmail.com wrote: Hi I am trying to use DF and save it to Elasticsearch using newHadoopApi (because I am using python). Can anyone guide me to help if this is even possible? -- Best Regards, Ayan Guha
Local Repartition
Hi, My data is constructed from a lot of small files which results in a lot of partitions per RDD. Is there some way to locally repartition the RDD without shuffling so that all of the partitions that reside on a specific node will become X partitions on the same node ? Thank you. Daniel
Proper saving/loading of MatrixFactorizationModel
Hi all! I have MatrixFactorizationModel object. If I'm trying to recommend products to single user right after constructing model through ALS.train(...) then it takes 300ms (for my data and hardware). But if I save model to disk and load it back then recommendation takes almost 2000ms. Also Spark warns: 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow. How can I create/set partitioner and cache user and product factors after loading model? Following approach didn't help: model.userFeatures().cache(); model.productFeatures().cache(); Also I was trying to repartition those rdds and create new model from repartitioned versions but that also didn't help. --- This email message is for the sole use of the intended recipient(s) and may contain confidential information. Any unauthorized review, use, disclosure or distribution is prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message. ---
PySpark Nested Json Parsing
Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File
Re: Using reference for RDD is safe?
Hi, thank you for your answer. but i was talking about function reference. I want to transform an RDD using a function consisting of multiple transforms. For example def transformFunc1(rdd: RDD[Int]): RDD[Int] = { } val rdd2 = transformFunc1(rdd1)... here i am using reference, i think but i am not sure. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-reference-for-RDD-is-safe-tp23843p23911.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
LDA on a large dataset
Hello, I'm trying to run LDA on a relatively large dataset (size 100-200 G), but with no luck so far. At first I made sure that the executors have enough memory with respect to the vocabulary size and number of topics. After that I ran LDA with default EMLDAOptimizer, but learning failed after a few iteration, because the application master ran out of disk. The learning job used all space available in the usercache of the application master (cca. 100G). I noticed that this implementation uses some sort of checkopointing so I made sure it is not used, but it didn't help. Afterwards, I tried the OnlineLDAOptimizer, but it started failing at reduce at LDAOptimizer.scala:421 with error message: Total size of serialized results of X tasks (Y GB) is bigger than spark.driver.maxResultSize (Y GB). I kept increasing the spark.driver.maxResultSize to tens of GB but it didn't help, just delayed this error. I tried to adjust the batch size to very small values so that I was sure it must fit into memory, but this didn't help at all. Has anyone experience with learning LDA on such a dataset? Maybe some ideas what might be wrong? I'm using spark 1.4.0 in yarn-client mode. I managed to learn a word2vec model on the same dataset with no problems at all. Thanks, Peter
spark streaming 1.3 issues
Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: JdbcRDD and ClassTag issue
Thanks Sujee :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-and-ClassTag-issue-tp18570p23912.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Repartition
Hi Daniel, Take a look at .coalesce() I’ve seen good results by coalescing to num executors * 10, but I’m still trying to figure out the optimal number of partitions per executor. To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1) Cheers, Doug On Jul 20, 2015, at 5:04 AM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, My data is constructed from a lot of small files which results in a lot of partitions per RDD. Is there some way to locally repartition the RDD without shuffling so that all of the partitions that reside on a specific node will become X partitions on the same node ? Thank you. Daniel - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Apache Spark : spark.eventLog.dir on Windows Environment
Hi All, I am working on Spark 1.4 on windows environment. I have to set eventLog directory so that I can reopen the Spark UI after application has finished. But I am not able to set eventLog.dir, It gives an error on Windows environment. Configuation is : entry key=spark.eventLog.enabled value=true / entry key=spark.eventLog.dir value=file:///c:/sparklogs / Exception I get : java.io.IOException: Cannot run program cygpath: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:206) I have also tried installing Cygwin but still the error doesn't go. Can anybody give any advice on it? I have posted the same question on Stackoverflow as well : http://stackoverflow.com/questions/31468716/apache-spark-spark-eventlog-dir-on-windows-environment Thanks Nitin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-spark-eventLog-dir-on-Windows-Environment-tp23913.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does Spark streaming support is there with RabbitMQ
Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want to process them using Apache Spark streaming does it scale? Regards Jeetendra
Re: Does Spark streaming support is there with RabbitMQ
There is one package available on the spark-packages site, http://spark-packages.org/package/Stratio/RabbitMQ-Receiver The source is here: https://github.com/Stratio/RabbitMQ-Receiver Not sure that meets your needs or not. -Todd On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com wrote: Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want to process them using Apache Spark streaming does it scale? Regards Jeetendra
Re: spark streaming 1.3 issues
Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Joda Time best practice?
Hi, I am having trouble with Joda Time in a Spark application and saw by now that I am not the only one (generally seems to have to do with serialization and internal caches of the Joda Time objects). Is there a known best practice to work around these issues? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark streaming support is there with RabbitMQ
Thanks Todd, I m not sure whether somebody has used it or not. can somebody confirm if this integrate nicely with Spark streaming? On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote: There is one package available on the spark-packages site, http://spark-packages.org/package/Stratio/RabbitMQ-Receiver The source is here: https://github.com/Stratio/RabbitMQ-Receiver Not sure that meets your needs or not. -Todd On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com wrote: Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want to process them using Apache Spark streaming does it scale? Regards Jeetendra
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:176) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:48) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) This is happening only when I'm doing a full data processing from Kafka. If there's no load, when you killed the driver and then restart, it resumes the checkpoint as expected without missing data. Did someone encounters something similar ? How did you solve this ? Regards, Nicolas PHUNG
Web UI Links
I'm running a spark cluster and I'd like to access the Spark-UI from outside the LAN. The problem is all the links are to internal IP addresses. Is there anyway to config hostnames for each of the hosts in the cluster and use those for the links?
k-means iteration not terminate
hi community, i have write a spark k-means app. now i run it on a cluster. my job start and at iteration nine or ten the process stop. in the spark dashbord all time shown is running, but nothing happend, no exceptions. my setting is the following: 1000 input points k=10 maxIteration=30 a tree node cluster (one node have 16GB RAM und 8cores i7) i use cloudera live 5.4.4 with spark 1.3 maybe spark need more memory or i have a wrong setting? best regards, paul
Re: Local Repartition
Thanks Doug, coalesce might invoke a shuffle as well. I don't think what I'm suggesting is a feature but it definitely should be. Daniel On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote: Hi Daniel, Take a look at .coalesce() I’ve seen good results by coalescing to num executors * 10, but I’m still trying to figure out the optimal number of partitions per executor. To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1) Cheers, Doug On Jul 20, 2015, at 5:04 AM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, My data is constructed from a lot of small files which results in a lot of partitions per RDD. Is there some way to locally repartition the RDD without shuffling so that all of the partitions that reside on a specific node will become X partitions on the same node ? Thank you. Daniel
Re: PySpark Nested Json Parsing
Could you try SQLContext.read.json()? On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote: Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in
Re: PySpark Nested Json Parsing
Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File
dataframes sql order by not total ordering
the following query on the Movielens dataset , is sorting by the count of ratings for a movie. It looks like the results are ordered by partition ? scala val results =sqlContext.sql(select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product order by cntu desc) movierates join movies on movierates.product=movies.movieId ) scala results.take(30).foreach(println) [Right Stuff, The (1983),5.0,1.0,750] [Lost in Space (1998),5.0,1.0,667] [Dumb Dumber (1994),5.0,1.0,660] [Patch Adams (1998),5.0,1.0,474] [Carlito's Way (1993),5.0,1.0,369] [Rounders (1998),5.0,1.0,345] [Bedknobs and Broomsticks (1971),5.0,1.0,319] [Beverly Hills Ninja (1997),5.0,1.0,232] [Saving Grace (2000),5.0,1.0,186] [Dangerous Minds (1995),5.0,1.0,141] [Death Wish II (1982),5.0,1.0,85] [All Dogs Go to Heaven 2 (1996),5.0,1.0,75] [Repossessed (1990),4.0,1.0,53] [Assignment, The (1997),5.0,1.0,49] [$1,000,000 Duck (1971),5.0,1.0,37] [Stonewall (1995),5.0,1.0,20] [Dog of Flanders, A (1999),5.0,1.0,8] [Frogs for Snakes (1998),3.0,1.0,5] [It's in the Water (1998),3.0,2.0,3] [Twelve Monkeys (1995),5.0,1.0,1511] [Ransom (1996),5.0,1.0,564] [Alice in Wonderland (1951),5.0,1.0,525] [City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392] [Eat Drink Man Woman (1994),5.0,1.0,346] [Cube (1997),5.0,1.0,233] [Omega Man, The (1971),5.0,1.0,224] [Stepmom (1998),5.0,1.0,146] [Metro (1997),5.0,1.0,100] [Death Wish 3 (1985),5.0,1.0,72] [Stalker (1979),5.0,1.0,52]
Re: Joda Time best practice?
Hey Jan, Can you provide more details on the serialization and cache issues. If you are looking for datetime functionality with spark-sql please consider: https://github.com/SparklineData/spark-datetime It provides a simple way to combine joda datetime expressions with spark sql. regards, Harish. On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am having trouble with Joda Time in a Spark application and saw by now that I am not the only one (generally seems to have to do with serialization and internal caches of the Joda Time objects). Is there a known best practice to work around these issues? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What is the correct syntax of using Spark streamingContext.fileStream()?
Hi I am trying to find correct way to use Spark Streaming API streamingContext.fileStream(String,ClassK,ClassV,ClassF) I tried to find example but could not find it anywhere in either Spark documentation. I have to stream files in hdfs which is of custom hadoop format. JavaPairDStreamVoid,MyRecordWritable input = streamingContext. fileStream(/path/to/hdfs/stream/dir/, Void.class, MyRecordWritable.class, MyInputFormat.class, ??); How do I implement fourth argument class type Function mentioned as ?? Please guide I am new to Spark Streaming. Thank in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-correct-syntax-of-using-Spark-streamingContext-fileStream-tp23916.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark Nested Json Parsing
I had the similar issue with spark 1.3 After migrating to Spark 1.4 and using sqlcontext.read.json it worked well I think you can look at dataframe select and explode options to read the nested json elements, array etc. Thanks. On Mon, Jul 20, 2015 at 11:07 AM, Davies Liu dav...@databricks.com wrote: Could you try SQLContext.read.json()? On Mon, Jul 20, 2015 at 9:06 AM, Davies Liu dav...@databricks.com wrote: Before using the json file as text file, can you make sure that each json string can fit in one line? Because textFile() will split the file by '\n' On Mon, Jul 20, 2015 at 3:26 AM, Ajay ajay0...@gmail.com wrote: Hi, I am new to Apache Spark. I am trying to parse nested json using pyspark. Here is the code by which I am trying to parse Json. I am using Apache Spark 1.2.0 version of cloudera CDH 5.3.2. lines = sc.textFile(inputFile) import json def func(x): json_str = json.loads(x) if json_str['label']: if json_str['label']['label2']: return (1,1) return (0,1) lines.map(func).reduceByKey(lambda a,b: a + b).saveAsTextFile(outputFile) I am getting following error, ERROR [Executor task launch worker-13] executor.Executor (Logging.scala:logError(96)) - Exception in task 1.0 in stage 14.0 (TID 25) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 107, in main process() File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/worker.py, line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 2073, in pipeline_func return func(split, prev_func(split, iterator)) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 247, in func return f(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/rdd.py, line 1561, in combineLocally merger.mergeValues(iterator) File /opt/cloudera/parcels/CDH-5.3.2-1.cdh5.3.2.p0.10/lib/spark/python/pyspark/shuffle.py, line 252, in mergeValues for k, v in iterator: File stdin, line 2, in func File /usr/lib64/python2.6/json/__init__.py, line 307, in loads return _default_decoder.decode(s) File /usr/lib64/python2.6/json/decoder.py, line 319, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File /usr/lib64/python2.6/json/decoder.py, line 336, in raw_decode obj, end = self._scanner.iterscan(s, **kw).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 183, in JSONObject value, end = iterscan(s, idx=end, context=context).next() File /usr/lib64/python2.6/json/scanner.py, line 55, in iterscan rval, next_pos = action(m, context) File /usr/lib64/python2.6/json/decoder.py, line 155, in JSONString return scanstring(match.string, match.end(), encoding, strict) ValueError: Invalid \escape: line 1 column 855 (char 855) at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:305) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-07-20 09:58:24,727 ERROR [Executor task launch worker-12] executor.Executor (Logging.scala:logError(96)) - Exception in task 0.0 in stage 14.0 (TID 24) org.apache.spark.api.python.PythonException: Traceback (most recent call
RE: Spark and SQL Server
When attempting to write a Dataframe to SQL Server that contains java.sql.Timestamp or java.lang.boolean objects I get errors about the query that is formed being invalid. Specifically, java.sql.Timestamp objects try to be written as the Timestamp type, which is not appropriate for date/time storage in TSQL. They should be datetimes or datetime2s. java.lang.boolean errors out because it tries to specify the width of the BIT field, which SQL Server doesn't like. However, I can write strings/varchars and ints without any issues. From: Davies Liu [dav...@databricks.com] Sent: Monday, July 20, 2015 9:08 AM To: Young, Matthew T Cc: user@spark.apache.org Subject: Re: Spark and SQL Server Sorry for the confusing. What's the other issues? On Mon, Jul 20, 2015 at 8:26 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Thanks Davies, that resolves the issue with Python. I was using the Java/Scala DataFrame documentation https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html and assuming that it was the same for PySpark http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter. I will keep this distinction in mind going forward. I guess we have to wait for Microsoft to release an SQL Server connector for Spark to resolve the other issues. Cheers, -- Matthew Young From: Davies Liu [dav...@databricks.com] Sent: Saturday, July 18, 2015 12:45 AM To: Young, Matthew T Cc: user@spark.apache.org Subject: Re: Spark and SQL Server I think you have a mistake on call jdbc(), it should be: jdbc(self, url, table, mode, properties) You had use properties as the third parameter. On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Hello, I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a couple of issues writing back. In Scala 2.10 I can write back to the database except for a couple of types. 1. When I read a DataFrame from a table that contains a datetime column it comes in as a java.sql.Timestamp object in the DataFrame. This is alright for Spark purposes, but when I go to write this back to the database with df.write.jdbc(…) it errors out because it is trying to write the TimeStamp type to SQL Server, which is not a date/time storing type in TSQL. I think it should be writing a datetime, but I’m not sure how to tell Spark this. 2. A related misunderstanding happens when I try to write a java.lang.boolean to the database; it errors out because Spark is trying to specify the width of the bit type, which is illegal in SQL Server (error msg: Cannot specify a column width on data type bit). Do I need to edit Spark source to fix this behavior, or is there a configuration option somewhere that I am not aware of? When I attempt to write back to SQL Server in an IPython notebook, py4j seems unable to convert a Python dict into a Java hashmap, which is necessary for parameter passing. I’ve documented details of this problem with code examples herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session. Any advice would be appreciated. Thank you for your time, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Yeah, in the function you supply for the messageHandler parameter to createDirectStream, catch the exception and do whatever makes sense for your application. On Mon, Jul 20, 2015 at 11:58 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, Using the old Spark Streaming Kafka API, I got the following around the same offset: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I found some old topic about some possible corrupt Kafka message produced by the new producer API with Snappy compression on. My question is, is it possible to skip/ignore those offsets when full processing with KafkaUtils.createStream or KafkaUtils.createDirectStream ? Regards, Nicolas PHUNG On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote: I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at
Broadcast variables in R
I've searched high and low to use broadcast variables in R. Is is possible at all? I don't see them mentioned in the SparkR API. Or is there another way of using this feature? I need to share a large amount of data between executors. At the moment, I get warned about my task being too large. I have tried pyspark, and there I can use them. Wkr, Serge -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23915.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark and SQL Server
Thanks Davies, that resolves the issue with Python. I was using the Java/Scala DataFrame documentation https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html and assuming that it was the same for PySpark http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter. I will keep this distinction in mind going forward. I guess we have to wait for Microsoft to release an SQL Server connector for Spark to resolve the other issues. Cheers, -- Matthew Young From: Davies Liu [dav...@databricks.com] Sent: Saturday, July 18, 2015 12:45 AM To: Young, Matthew T Cc: user@spark.apache.org Subject: Re: Spark and SQL Server I think you have a mistake on call jdbc(), it should be: jdbc(self, url, table, mode, properties) You had use properties as the third parameter. On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Hello, I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a couple of issues writing back. In Scala 2.10 I can write back to the database except for a couple of types. 1. When I read a DataFrame from a table that contains a datetime column it comes in as a java.sql.Timestamp object in the DataFrame. This is alright for Spark purposes, but when I go to write this back to the database with df.write.jdbc(…) it errors out because it is trying to write the TimeStamp type to SQL Server, which is not a date/time storing type in TSQL. I think it should be writing a datetime, but I’m not sure how to tell Spark this. 2. A related misunderstanding happens when I try to write a java.lang.boolean to the database; it errors out because Spark is trying to specify the width of the bit type, which is illegal in SQL Server (error msg: Cannot specify a column width on data type bit). Do I need to edit Spark source to fix this behavior, or is there a configuration option somewhere that I am not aware of? When I attempt to write back to SQL Server in an IPython notebook, py4j seems unable to convert a Python dict into a Java hashmap, which is necessary for parameter passing. I’ve documented details of this problem with code examples herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session. Any advice would be appreciated. Thank you for your time, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Repartition
Great explanation. Thanks guys! Daniel On 20 ביולי 2015, at 18:12, Silvio Fiorito silvio.fior...@granturing.com wrote: Hi Daniel, Coalesce, by default will not cause a shuffle. The second parameter when set to true will cause a full shuffle. This is actually what repartition does (calls coalesce with shuffle=true). It will attempt to keep colocated partitions together (as you describe) on the same executor. What may happen is you lose data locality if you reduce the partitions to fewer than the number of executors. You obviously also reduce parallelism so you need to be aware of that as you decide when to call coalesce. Thanks, Silvio From: Daniel Haviv Date: Monday, July 20, 2015 at 4:59 PM To: Doug Balog Cc: user Subject: Re: Local Repartition Thanks Doug, coalesce might invoke a shuffle as well. I don't think what I'm suggesting is a feature but it definitely should be. Daniel On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.net wrote: Hi Daniel, Take a look at .coalesce() I’ve seen good results by coalescing to num executors * 10, but I’m still trying to figure out the optimal number of partitions per executor. To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1) Cheers, Doug On Jul 20, 2015, at 5:04 AM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, My data is constructed from a lot of small files which results in a lot of partitions per RDD. Is there some way to locally repartition the RDD without shuffling so that all of the partitions that reside on a specific node will become X partitions on the same node ? Thank you. Daniel
Re: use S3-Compatible Storage with spark
Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Local Repartition
Hi Daniel, Coalesce, by default will not cause a shuffle. The second parameter when set to true will cause a full shuffle. This is actually what repartition does (calls coalesce with shuffle=true). It will attempt to keep colocated partitions together (as you describe) on the same executor. What may happen is you lose data locality if you reduce the partitions to fewer than the number of executors. You obviously also reduce parallelism so you need to be aware of that as you decide when to call coalesce. Thanks, Silvio From: Daniel Haviv Date: Monday, July 20, 2015 at 4:59 PM To: Doug Balog Cc: user Subject: Re: Local Repartition Thanks Doug, coalesce might invoke a shuffle as well. I don't think what I'm suggesting is a feature but it definitely should be. Daniel On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog d...@balog.netmailto:d...@balog.net wrote: Hi Daniel, Take a look at .coalesce() I’ve seen good results by coalescing to num executors * 10, but I’m still trying to figure out the optimal number of partitions per executor. To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1) Cheers, Doug On Jul 20, 2015, at 5:04 AM, Daniel Haviv daniel.ha...@veracity-group.commailto:daniel.ha...@veracity-group.com wrote: Hi, My data is constructed from a lot of small files which results in a lot of partitions per RDD. Is there some way to locally repartition the RDD without shuffling so that all of the partitions that reside on a specific node will become X partitions on the same node ? Thank you. Daniel
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Cool, I tried that as well, and doesn't seem different: spark.yarn.jar seems set [image: Inline image 1] This actually doesn't change the classpath, not sure if it should: [image: Inline image 3] But same netlib warning. Thanks for the help! - Arun On Fri, Jul 17, 2015 at 3:18 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1
Re: Spark and SQL Server
Sorry for the confusing. What's the other issues? On Mon, Jul 20, 2015 at 8:26 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Thanks Davies, that resolves the issue with Python. I was using the Java/Scala DataFrame documentation https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrameWriter.html and assuming that it was the same for PySpark http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter. I will keep this distinction in mind going forward. I guess we have to wait for Microsoft to release an SQL Server connector for Spark to resolve the other issues. Cheers, -- Matthew Young From: Davies Liu [dav...@databricks.com] Sent: Saturday, July 18, 2015 12:45 AM To: Young, Matthew T Cc: user@spark.apache.org Subject: Re: Spark and SQL Server I think you have a mistake on call jdbc(), it should be: jdbc(self, url, table, mode, properties) You had use properties as the third parameter. On Fri, Jul 17, 2015 at 10:15 AM, Young, Matthew T matthew.t.yo...@intel.com wrote: Hello, I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a couple of issues writing back. In Scala 2.10 I can write back to the database except for a couple of types. 1. When I read a DataFrame from a table that contains a datetime column it comes in as a java.sql.Timestamp object in the DataFrame. This is alright for Spark purposes, but when I go to write this back to the database with df.write.jdbc(…) it errors out because it is trying to write the TimeStamp type to SQL Server, which is not a date/time storing type in TSQL. I think it should be writing a datetime, but I’m not sure how to tell Spark this. 2. A related misunderstanding happens when I try to write a java.lang.boolean to the database; it errors out because Spark is trying to specify the width of the bit type, which is illegal in SQL Server (error msg: Cannot specify a column width on data type bit). Do I need to edit Spark source to fix this behavior, or is there a configuration option somewhere that I am not aware of? When I attempt to write back to SQL Server in an IPython notebook, py4j seems unable to convert a Python dict into a Java hashmap, which is necessary for parameter passing. I’ve documented details of this problem with code examples herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session. Any advice would be appreciated. Thank you for your time, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Resume checkpoint failed with Spark Streaming Kafka via createDirectStream under heavy reprocessing
Hello, Using the old Spark Streaming Kafka API, I got the following around the same offset: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 3561357254, computed crc = 171652633) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/07/20 15:56:57 INFO BlockManager: Removing broadcast 4641 15/07/20 15:56:57 ERROR ReliableKafkaReceiver: Error handling message java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:265) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I found some old topic about some possible corrupt Kafka message produced by the new producer API with Snappy compression on. My question is, is it possible to skip/ignore those offsets when full processing with KafkaUtils.createStream or KafkaUtils.createDirectStream ? Regards, Nicolas PHUNG On Mon, Jul 20, 2015 at 3:46 PM, Cody Koeninger c...@koeninger.org wrote: I'd try logging the offsets for each message, see where problems start, then try using the console consumer starting at those offsets and see if you can reproduce the problem. On Mon, Jul 20, 2015 at 2:15 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hi Cody, Thanks for you help. It seems there's something wrong with some messages within my Kafka topics then. I don't understand how, I can get bigger or incomplete message since I use default configuration to accept only 1Mb message in my Kafka topic. If you have any others informations or suggestions, please tell me. Regards, Nicolas PHUNG On Thu, Jul 16, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Not exactly the same issue, but possibly related: https://issues.apache.org/jira/browse/KAFKA-1196 On Thu, Jul 16, 2015 at 12:03 PM, Cody Koeninger c...@koeninger.org wrote: Well, working backwards down the stack trace... at java.nio.Buffer.limit(Buffer.java:275) That exception gets thrown if the limit is negative or greater than the buffer's capacity at kafka.message.Message.sliceDelimited(Message.scala:236) If size had been negative, it would have just returned null, so we know the exception got thrown because the size was greater than the buffer's capacity I haven't seen that before... maybe a corrupted message of some kind? If that problem is reproducible, try providing an explicit argument for messageHandler, with a function that logs the message offset. On Thu, Jul 16, 2015 at 11:28 AM, Nicolas Phung nicolas.ph...@gmail.com wrote: Hello, When I'm reprocessing the data from kafka (about 40 Gb) with the new Spark Streaming Kafka method createDirectStream, everything is fine till a driver error happened (driver is killed, connection lost...). When the driver pops up again, it resumes the processing with the checkpoint in HDFS. Except, I got this: 15/07/16 15:23:41 ERROR TaskSetManager: Task 4 in stage 4.0 failed 4 times; aborting job 15/07/16 15:23:41 ERROR JobScheduler: Error running job streaming job 1437032118000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 4.0 failed 4 times, most recent failure: Lost task 4.3 in stage 4.0 (TID 16, slave05.local): java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:275) at kafka.message.Message.sliceDelimited(Message.scala:236) at kafka.message.Message.payload(Message.scala:218) at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$6.apply(KafkaUtils.scala:395) at
Re: Joda Time best practice?
Can you post details on how to reproduce the NPE On Mon, Jul 20, 2015 at 1:19 PM, algermissen1971 algermissen1...@icloud.com wrote: Hi Harish, On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote: Hey Jan, Can you provide more details on the serialization and cache issues. My symptom is that I have a Joda DateTime on which I can call toString and getMillis without problems, but when I call getYear I get a NPE out of the internal AbstractDateTime. Totally strange but seems to align with issues others have. I am now changing the app to work with millis internally, as that seems to be a performance improvement regarding serialization anyhow. Thanks, Jan If you are looking for datetime functionality with spark-sql please consider: https://github.com/SparklineData/spark-datetime It provides a simple way to combine joda datetime expressions with spark sql. regards, Harish. On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am having trouble with Joda Time in a Spark application and saw by now that I am not the only one (generally seems to have to do with serialization and internal caches of the Joda Time objects). Is there a known best practice to work around these issues? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Increment counter variable in RDD transformation function
I'm trying to keep track of some information in a RDD.flatMap() function (using Java API in 1.4.0). I have two longs in the function, and I am incrementing them when appropriate, and checking their values to determine how many objects to output from the function. I'm not trying to read the values in the driver or use them as a global counter, just trying to count within the task. This appears not to be working. Should I expect this to work? If so, any pointers as to what I might be doing wrong? Code looks something like: JavaRDDLabeledPoint parsedData = data.map(new FunctionString, LabeledPoint() { Long count = 0L; public LabeledPoint call(String line) { count++; String[] parts = line.split(,); String[] features = parts[1].split( ); double[] v = new double[features.length]; for (int i = 0; i features.length - 1; i++) v[i] = Double.parseDouble(features[i]); if (count == 50) { //return something else } return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } } );
Re: LDA on a large dataset
LDAOptimizer.scala:421 collects to driver a numTopics by vocabSize matrix of summary statistics. I suspect that this is what's causing the failure. One thing you may try doing is decreasing the vocabulary size. One possibility would be to use a HashingTF if you don't mind dimension reduction via hashing collisions. On Mon, Jul 20, 2015 at 3:21 AM, Peter Zvirinsky peter.zvirin...@gmail.com wrote: Hello, I'm trying to run LDA on a relatively large dataset (size 100-200 G), but with no luck so far. At first I made sure that the executors have enough memory with respect to the vocabulary size and number of topics. After that I ran LDA with default EMLDAOptimizer, but learning failed after a few iteration, because the application master ran out of disk. The learning job used all space available in the usercache of the application master (cca. 100G). I noticed that this implementation uses some sort of checkopointing so I made sure it is not used, but it didn't help. Afterwards, I tried the OnlineLDAOptimizer, but it started failing at reduce at LDAOptimizer.scala:421 with error message: Total size of serialized results of X tasks (Y GB) is bigger than spark.driver.maxResultSize (Y GB). I kept increasing the spark.driver.maxResultSize to tens of GB but it didn't help, just delayed this error. I tried to adjust the batch size to very small values so that I was sure it must fit into memory, but this didn't help at all. Has anyone experience with learning LDA on such a dataset? Maybe some ideas what might be wrong? I'm using spark 1.4.0 in yarn-client mode. I managed to learn a word2vec model on the same dataset with no problems at all. Thanks, Peter
Re: Silly question about building Spark 1.4.1
Thanks Dean… I was building based on the information found on the Spark 1.4.1 documentation. So I have to ask the following: Shouldn’t the examples be updated to reflect Hadoop 2.6 or are the vendors’ distro not up to 2.6 and that’s why its still showing 2.4? Also I’m trying to build with support for Scala 2.11 Are there any known issues between Scala 2.11 and Hive and hive-thrift server? Dean, the reason I asked about needed to specify the Hive and Hive-Thriftserver options is that at the end of the build I see the following: “ [INFO] Spark Project SQL .. SUCCESS [02:06 min] [INFO] Spark Project ML Library ... SUCCESS [02:23 min] [INFO] Spark Project Tools SUCCESS [ 13.305 s] [INFO] Spark Project Hive . SUCCESS [01:55 min] [INFO] Spark Project REPL . SUCCESS [ 40.488 s] [INFO] Spark Project YARN . SUCCESS [ 38.793 s] [INFO] Spark Project Assembly . SUCCESS [01:10 min] [INFO] Spark Project External Twitter . SUCCESS [ 14.907 s] [INFO] Spark Project External Flume Sink .. SUCCESS [ 21.748 s] [INFO] Spark Project External Flume ... SUCCESS [ 31.754 s] [INFO] Spark Project External MQTT SUCCESS [ 17.921 s] [INFO] Spark Project External ZeroMQ .. SUCCESS [ 18.037 s] [INFO] Spark Project External Kafka ... SUCCESS [ 41.941 s] [INFO] Spark Project Examples . SUCCESS [01:56 min] [INFO] Spark Project External Kafka Assembly .. SUCCESS [ 24.806 s] [INFO] Spark Project YARN Shuffle Service . SUCCESS [ 5.204 s] [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 22:40 min [INFO] Finished at: 2015-07-20T12:54:23-07:00 [INFO] Final Memory: 109M/2332M [INFO] “ Granted this may be something completely different which is why the next time I do a build, I’m going to capture the stderr/stdout to a file. Thx for the quick response. On Jul 20, 2015, at 1:11 PM, Ted Yu yuzhih...@gmail.com wrote: In master (as well as 1.4.1) I don't see hive profile in pom.xml I do find hive-provided profile, though. FYI On Mon, Jul 20, 2015 at 1:05 PM, Dean Wampler deanwamp...@gmail.com mailto:deanwamp...@gmail.com wrote: hadoop-2.6 is supported (look for profile XML in the pom.xml file). For Hive, add -Phive -Phive-thriftserver (See http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables) for more details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com/ @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com http://polyglotprogramming.com/ On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com mailto:msegel_had...@hotmail.com wrote: Sorry, Should have sent this to user… However… it looks like the docs page may need some editing? Thx -Mike Begin forwarded message: From: Michael Segel msegel_had...@hotmail.com mailto:msegel_had...@hotmail.com Subject: Silly question about building Spark 1.4.1 Date: July 20, 2015 at 12:26:40 PM MST To: d...@spark.apache.org mailto:d...@spark.apache.org Hi, I’m looking at the online docs for building spark 1.4.1 … http://spark.apache.org/docs/latest/building-spark.html http://spark.apache.org/docs/latest/building-spark.html I was interested in building spark for Scala 2.11 (latest scala) and also for Hive and JDBC support. The docs say: “ To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property: dev/change-version-to-2.11.sh http://change-version-to-2.11.sh/ mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package “ So… Is there a reason I shouldn’t build against hadoop-2.6 ? If I want to add the Thirft and Hive support, is it possible? Looking at the Scala build, it looks like hive support is being built? (Looking at the stdout messages…) Should the docs be updated? Am I missing something? (Dean W. can confirm, I am completely brain dead. ;-) Thx -Mike PS. Yes I can probably download a prebuilt image, but I’m a glutton for punishment. ;-)
Re: Joda Time best practice?
On 20 Jul 2015, at 23:20, Harish Butani rhbutani.sp...@gmail.com wrote: Can you post details on how to reproduce the NPE Essentially it is like this: I have a scala case class that contains a Joda DateTime attribute and instances of this class are updated using updateStateByKey. When a certain condition is reached the instances are turned to Json (using spray.json) and are stored in ElasticSearch. just before creating the JSON I call getYear on the date attribute and that fails with NPE. When insert a getMillis or toString right before the getYear these work just fine. Jan On Mon, Jul 20, 2015 at 1:19 PM, algermissen1971 algermissen1...@icloud.com wrote: Hi Harish, On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote: Hey Jan, Can you provide more details on the serialization and cache issues. My symptom is that I have a Joda DateTime on which I can call toString and getMillis without problems, but when I call getYear I get a NPE out of the internal AbstractDateTime. Totally strange but seems to align with issues others have. I am now changing the app to work with millis internally, as that seems to be a performance improvement regarding serialization anyhow. Thanks, Jan If you are looking for datetime functionality with spark-sql please consider: https://github.com/SparklineData/spark-datetime It provides a simple way to combine joda datetime expressions with spark sql. regards, Harish. On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am having trouble with Joda Time in a Spark application and saw by now that I am not the only one (generally seems to have to do with serialization and internal caches of the Joda Time objects). Is there a known best practice to work around these issues? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Broadcast variables in R
Hi Serge, The broadcast function was made private when SparkR merged into Apache Spark for the 1.4.0 release. You can still use broadcast by specifying the private namespace though. SparkR:::broadcast(sc, obj) The RDD methods were considered very low-level, and the SparkR devs are still figuring out which of them they¹d like to expose along with the higher-level DataFrame API. You can see the rationale for the decision on the project JIRA [1]. [1] -- https://issues.apache.org/jira/browse/SPARK-7230 Hope that helps, Alek On 7/20/15, 12:00 PM, Serge Franchois serge.franch...@altran.com wrote: I've searched high and low to use broadcast variables in R. Is is possible at all? I don't see them mentioned in the SparkR API. Or is there another way of using this feature? I need to share a large amount of data between executors. At the moment, I get warned about my task being too large. I have tried pyspark, and there I can use them. Wkr, Serge -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in -R-tp23915.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dataframes sql order by not total ordering
An ORDER BY needs to be on the outermost query otherwise subsequent operations (such as the join) could reorder the tuples. On Mon, Jul 20, 2015 at 9:25 AM, Carol McDonald cmcdon...@maprtech.com wrote: the following query on the Movielens dataset , is sorting by the count of ratings for a movie. It looks like the results are ordered by partition ? scala val results =sqlContext.sql(select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product order by cntu desc) movierates join movies on movierates.product=movies.movieId ) scala results.take(30).foreach(println) [Right Stuff, The (1983),5.0,1.0,750] [Lost in Space (1998),5.0,1.0,667] [Dumb Dumber (1994),5.0,1.0,660] [Patch Adams (1998),5.0,1.0,474] [Carlito's Way (1993),5.0,1.0,369] [Rounders (1998),5.0,1.0,345] [Bedknobs and Broomsticks (1971),5.0,1.0,319] [Beverly Hills Ninja (1997),5.0,1.0,232] [Saving Grace (2000),5.0,1.0,186] [Dangerous Minds (1995),5.0,1.0,141] [Death Wish II (1982),5.0,1.0,85] [All Dogs Go to Heaven 2 (1996),5.0,1.0,75] [Repossessed (1990),4.0,1.0,53] [Assignment, The (1997),5.0,1.0,49] [$1,000,000 Duck (1971),5.0,1.0,37] [Stonewall (1995),5.0,1.0,20] [Dog of Flanders, A (1999),5.0,1.0,8] [Frogs for Snakes (1998),3.0,1.0,5] [It's in the Water (1998),3.0,2.0,3] [Twelve Monkeys (1995),5.0,1.0,1511] [Ransom (1996),5.0,1.0,564] [Alice in Wonderland (1951),5.0,1.0,525] [City Slickers II: The Legend of Curly's Gold (1994),5.0,1.0,392] [Eat Drink Man Woman (1994),5.0,1.0,346] [Cube (1997),5.0,1.0,233] [Omega Man, The (1971),5.0,1.0,224] [Stepmom (1998),5.0,1.0,146] [Metro (1997),5.0,1.0,100] [Death Wish 3 (1985),5.0,1.0,72] [Stalker (1979),5.0,1.0,52]
Re: Silly question about building Spark 1.4.1
hadoop-2.6 is supported (look for profile XML in the pom.xml file). For Hive, add -Phive -Phive-thriftserver (See http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables) for more details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com wrote: Sorry, Should have sent this to user… However… it looks like the docs page may need some editing? Thx -Mike Begin forwarded message: *From: *Michael Segel msegel_had...@hotmail.com *Subject: **Silly question about building Spark 1.4.1* *Date: *July 20, 2015 at 12:26:40 PM MST *To: *d...@spark.apache.org Hi, I’m looking at the online docs for building spark 1.4.1 … http://spark.apache.org/docs/latest/building-spark.html I was interested in building spark for Scala 2.11 (latest scala) and also for Hive and JDBC support. The docs say: “ To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package “ So… Is there a reason I shouldn’t build against hadoop-2.6 ? If I want to add the Thirft and Hive support, is it possible? Looking at the Scala build, it looks like hive support is being built? (Looking at the stdout messages…) Should the docs be updated? Am I missing something? (Dean W. can confirm, I am completely brain dead. ;-) Thx -Mike PS. Yes I can probably download a prebuilt image, but I’m a glutton for punishment. ;-)
Re: Silly question about building Spark 1.4.1
In master (as well as 1.4.1) I don't see hive profile in pom.xml I do find hive-provided profile, though. FYI On Mon, Jul 20, 2015 at 1:05 PM, Dean Wampler deanwamp...@gmail.com wrote: hadoop-2.6 is supported (look for profile XML in the pom.xml file). For Hive, add -Phive -Phive-thriftserver (See http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables) for more details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Jul 20, 2015 at 2:55 PM, Michael Segel msegel_had...@hotmail.com wrote: Sorry, Should have sent this to user… However… it looks like the docs page may need some editing? Thx -Mike Begin forwarded message: *From: *Michael Segel msegel_had...@hotmail.com *Subject: **Silly question about building Spark 1.4.1* *Date: *July 20, 2015 at 12:26:40 PM MST *To: *d...@spark.apache.org Hi, I’m looking at the online docs for building spark 1.4.1 … http://spark.apache.org/docs/latest/building-spark.html I was interested in building spark for Scala 2.11 (latest scala) and also for Hive and JDBC support. The docs say: “ To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package “ So… Is there a reason I shouldn’t build against hadoop-2.6 ? If I want to add the Thirft and Hive support, is it possible? Looking at the Scala build, it looks like hive support is being built? (Looking at the stdout messages…) Should the docs be updated? Am I missing something? (Dean W. can confirm, I am completely brain dead. ;-) Thx -Mike PS. Yes I can probably download a prebuilt image, but I’m a glutton for punishment. ;-)
Re: Joda Time best practice?
Hi Harish, On 20 Jul 2015, at 20:37, Harish Butani rhbutani.sp...@gmail.com wrote: Hey Jan, Can you provide more details on the serialization and cache issues. My symptom is that I have a Joda DateTime on which I can call toString and getMillis without problems, but when I call getYear I get a NPE out of the internal AbstractDateTime. Totally strange but seems to align with issues others have. I am now changing the app to work with millis internally, as that seems to be a performance improvement regarding serialization anyhow. Thanks, Jan If you are looking for datetime functionality with spark-sql please consider: https://github.com/SparklineData/spark-datetime It provides a simple way to combine joda datetime expressions with spark sql. regards, Harish. On Mon, Jul 20, 2015 at 7:37 AM, algermissen1971 algermissen1...@icloud.com wrote: Hi, I am having trouble with Joda Time in a Spark application and saw by now that I am not the only one (generally seems to have to do with serialization and internal caches of the Joda Time objects). Is there a known best practice to work around these issues? Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Silly question about building Spark 1.4.1
Sorry, Should have sent this to user… However… it looks like the docs page may need some editing? Thx -Mike Begin forwarded message: From: Michael Segel msegel_had...@hotmail.com Subject: Silly question about building Spark 1.4.1 Date: July 20, 2015 at 12:26:40 PM MST To: d...@spark.apache.org Hi, I’m looking at the online docs for building spark 1.4.1 … http://spark.apache.org/docs/latest/building-spark.html http://spark.apache.org/docs/latest/building-spark.html I was interested in building spark for Scala 2.11 (latest scala) and also for Hive and JDBC support. The docs say: “ To produce a Spark package compiled with Scala 2.11, use the -Dscala-2.11 property: dev/change-version-to-2.11.sh mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package “ So… Is there a reason I shouldn’t build against hadoop-2.6 ? If I want to add the Thirft and Hive support, is it possible? Looking at the Scala build, it looks like hive support is being built? (Looking at the stdout messages…) Should the docs be updated? Am I missing something? (Dean W. can confirm, I am completely brain dead. ;-) Thx -Mike PS. Yes I can probably download a prebuilt image, but I’m a glutton for punishment. ;-)
Re: Web UI Links
I figured this out after spelunking the UI code a little. The trick is to set the SPARK_PUBLIC_DNS environmental variable to the public DNS name of each server in the cluster, per node. I'm running in standalone mode, so it was just a matter of adding the setting to spark-env.sh. On Mon, Jul 20, 2015 at 9:59 AM Bob Corsaro rcors...@gmail.com wrote: I'm running a spark cluster and I'd like to access the Spark-UI from outside the LAN. The problem is all the links are to internal IP addresses. Is there anyway to config hostnames for each of the hosts in the cluster and use those for the links?
Re: How to restart Twitter spark stream
Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at org.apache.spark.streaming.dstream.ReceiverInputDStream.init(ReceiverInputDStream.scala:41) at org.apache.spark.streaming.twitter.TwitterInputDStream.init(TwitterInputDStream.scala:46) at org.apache.spark.streaming.twitter.TwitterUtils$.createStream(TwitterUtils.scala:44) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.initializeNewStream(TwitterHashtagsStreamsManager.scala:113) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.restartHashTagsStream(TwitterHashtagsStreamsManager.scala:174) at org.prosolo.bigdata.scala.twitter.TwitterHashtagsStreamsManager$.addHashTagsFromBufferAndRestartStream(TwitterHashtagsStreamsManager.scala:162) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$.processBufferEvents(HashtagsUpdatesBuffer.scala:41) at org.prosolo.bigdata.scala.twitter.HashtagsUpdatesBuffer$$anon$1.run(HashtagsUpdatesBuffer.scala:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) INFO[2015-07-18 22:24:23,430] [Twitter Stream consumer-1[Disposing thread]] twitter4j.TwitterStreamImpl (SLF4JLogger.java:83) Inflater has been closed ERROR[2015-07-18 22:24:32,503] [sparkDriver-akka.actor.default-dispatcher-3] streaming.receiver.ReceiverSupervisorImpl (Logging.scala:75) Error stopping receiver 0org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) Anybody can explain how to solve
Re: Spark-hive parquet schema evolution
I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Spark 1.4.1,MySQL and DataFrameReader.read.jdbc fun
I have Spark 1.4.1, running on a YARN cluster. When I do a pyspark, in yarn-client mode: pyspark --jars ~/dev/spark/lib/mysql-connector-java-5.1.36-bin.jar --driver-class-path ~/dev/spark/lib/mysql-connector-java-5.1.36-bin.jar and then do the equivalent of.. tbl = sqlContext.read.jdbc(jdbc:mysql://, tableName, properties={user:blah, password:pw}) I get the No Suitable Driver found error when I attempted to do a tbl.show() or maybe a tbl.describe(), etc. This even happens in the spark-shell too. Currently I do NOT use the SPARK_CLASSPATH (as I've seen that talked about and knowing that it is deprecated). I also do NOT set the spark.executor.extraClassPath property because I thought that was the whole point of --jars option. So, do I need to deploy the mysql connector to a known location on my YARN node managers, and then reference that JAR location someplace? If so, what cmd line options do I use, or properties do I set? I thought the --jars cmd line option put the JARs into the class path to be used, is this not the case? Another question, why do I need --driver-class-path location of mysql jar? If I don't use this cmd line option, I get an error just attempting to do the sqlContext.read.jdbc() assignment..not trying to perform an operation on the RDD. Cheers, Aaron - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Counting distinct values for a key?
Hi Jerry, In fact, HashSet approach is what we took earlier. However, this did not work with a Windowed DStream (i.e. if we provide a forward and inverse reduce operation). The reason is that the inverse reduce tries to remove values that may still exist elsewhere in the window and should not have been removed. We discovered the logic error recently i.e. basically this is not an invertible function. I thought of doing this without an inverse reduce function but even over a 15 minute window, this is going to be an expensive operation. Instead, I have put in place a solution using HashMap where I keep the actual counts around and in inverse reduce we decrement the counts. At the end we remove the keys for which the value is 0 and then take the size of the map (in terms of how many keys it has that have non-zero counts). This gives us the distinct count. I was hoping that there is a more straightforward way of doing this within Spark itself without having to resort to a hack like this. Thanks Nikunj On Sun, Jul 19, 2015 at 6:13 PM, Jerry Lam chiling...@gmail.com wrote: Hi Nikunj, Sorry, I totally misread your question. I think you need to first groupbykey (get all values of the same key together), then follow by mapValues (probably put the values into a set and then take the size of it because you want a distinct count) HTH, Jerry Sent from my iPhone On 19 Jul, 2015, at 8:48 pm, N B nb.nos...@gmail.com wrote: Hi Suyog, That code outputs the following: key2 val22 : 1 key1 val1 : 2 key2 val2 : 2 while the output I want to achieve would have been (with your example): key1 : 2 key2 : 2 because there are 2 distinct types of values for each key ( regardless of their actual duplicate counts .. hence the use of the DISTINCT keyword in the query equivalent ). Thanks Nikunj On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari suyogchoudh...@gmail.com wrote: public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName(CountDistinct); JavaSparkContext jsc = new JavaSparkContext(sparkConf); ListTuple2String, String list = new ArrayListTuple2String, String(); list.add(new Tuple2String, String(key1, val1)); list.add(new Tuple2String, String(key1, val1)); list.add(new Tuple2String, String(key2, val2)); list.add(new Tuple2String, String(key2, val2)); list.add(new Tuple2String, String(key2, val22)); JavaPairRDDString, Integer rdd = jsc.parallelize(list).mapToPair(t - new Tuple2String, Integer(t._1 + +t._2, 1)); JavaPairRDDString, Integer rdd2 = rdd.reduceByKey((c1, c2) - c1+c2 ); ListTuple2String, Integer output = rdd2.collect(); for (Tuple2?,? tuple : output) { System.out.println( tuple._1() + : + tuple._2() ); } } On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote: You mean this does not work? SELECT key, count(value) from table group by key On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote: Hello, How do I go about performing the equivalent of the following SQL clause in Spark Streaming? I will be using this on a Windowed DStream. SELECT key, count(distinct(value)) from table group by key; so for example, given the following dataset in the table: key | value -+--- k1 | v1 k1 | v1 k1 | v2 k1 | v3 k1 | v3 k2 | vv1 k2 | vv1 k2 | vv2 k2 | vv2 k2 | vv2 k3 | vvv1 k3 | vvv1 the result will be: key | count -+--- k1 | 3 k2 | 2 k3 | 1 Thanks Nikunj
Is SPARK is the right choice for traditional OLAP query processing?
All, I really appreciate anyone's input on this. We are having a very simple traditional OLAP query processing use case. Our use case is as follows. 1. We have a customer sales order table data coming from RDBMs table. 2. There are many dimension columns in the sales order table. For each of those dimensions, we have individual dimension tables that stores the dimension record sets. 3. We also have some BI like hierarchies that is defined for dimension data set. What we want for business users is as follows.? 1. We wanted to show some aggregated values from sales Order transaction table columns. 2. User would like to filter these with specific dimension values from dimension table. 3. User should be able to drill down from higher level to lower level by traversing hierarchy on dimension We want these use actions respond within 2 to 5 seconds. We are thinking about using SPARK as our backend enginee to sever data to these front end application. Has anyone tried using SPARK for these kind of use cases. These are all traditional use cases in BI space. If so, can SPARK respond to these queries with in 2 to 5 seconds for large data sets. Thanks, Renga -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-SPARK-is-the-right-choice-for-traditional-OLAP-query-processing-tp23921.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Kmeans Labeled Point RDD
I responded to your question on SO. Let me know if this what you wanted. http://stackoverflow.com/a/31528274/2336943 Mohammed -Original Message- From: plazaster [mailto:michaelplaz...@gmail.com] Sent: Sunday, July 19, 2015 11:38 PM To: user@spark.apache.org Subject: Re: Kmeans Labeled Point RDD Has there been any progress on this, I am in the same boat. I posted a similar question to Stack Exchange. http://stackoverflow.com/questions/31447141/spark-mllib-kmeans-from-dataframe-and-back-again -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kmeans-Labeled-Point-RDD-tp22989p23907.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Data frames select and where clause dependency
Michael, How would the Catalyst optimizer optimize this version? df.filter(df(filter_field) === value).select(field1).show() Would it still read all the columns in df or would it read only “filter_field” and “field1” since only two columns are used (assuming other columns from df are not used anywhere else)? Mohammed From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, July 17, 2015 1:39 PM To: Mike Trienis Cc: user@spark.apache.org Subject: Re: Data frames select and where clause dependency Each operation on a dataframe is completely independent and doesn't know what operations happened before it. When you do a selection, you are removing other columns from the dataframe and so the filter has nothing to operate on. On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.commailto:mike.trie...@orcsol.com wrote: I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine * df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); * df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following * df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
Re: Data frames select and where clause dependency
Yes via: org.apache.spark.sql.catalyst.optimizer.ColumnPruning See DefaultOptimizer.batches for list of logical rewrites. You can see the optimized plan by printing: df.queryExecution.optimizedPlan On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.com wrote: Michael, How would the Catalyst optimizer optimize this version? df.filter(df(filter_field) === value).select(field1).show() Would it still read all the columns in df or would it read only “filter_field” and “field1” since only two columns are used (assuming other columns from df are not used anywhere else)? Mohammed *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Friday, July 17, 2015 1:39 PM *To:* Mike Trienis *Cc:* user@spark.apache.org *Subject:* Re: Data frames select and where clause dependency Each operation on a dataframe is completely independent and doesn't know what operations happened before it. When you do a selection, you are removing other columns from the dataframe and so the filter has nothing to operate on. On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com wrote: I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine - df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); - df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following - df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
RE: Data frames select and where clause dependency
Thanks, Harish. Mike – this would be a cleaner version for your use case: df.filter(df(filter_field) === value).select(field1).show() Mohammed From: Harish Butani [mailto:rhbutani.sp...@gmail.com] Sent: Monday, July 20, 2015 5:37 PM To: Mohammed Guller Cc: Michael Armbrust; Mike Trienis; user@spark.apache.org Subject: Re: Data frames select and where clause dependency Yes via: org.apache.spark.sql.catalyst.optimizer.ColumnPruning See DefaultOptimizer.batches for list of logical rewrites. You can see the optimized plan by printing: df.queryExecution.optimizedPlan On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: Michael, How would the Catalyst optimizer optimize this version? df.filter(df(filter_field) === value).select(field1).show() Would it still read all the columns in df or would it read only “filter_field” and “field1” since only two columns are used (assuming other columns from df are not used anywhere else)? Mohammed From: Michael Armbrust [mailto:mich...@databricks.commailto:mich...@databricks.com] Sent: Friday, July 17, 2015 1:39 PM To: Mike Trienis Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Data frames select and where clause dependency Each operation on a dataframe is completely independent and doesn't know what operations happened before it. When you do a selection, you are removing other columns from the dataframe and so the filter has nothing to operate on. On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.commailto:mike.trie...@orcsol.com wrote: I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine * df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); * df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following * df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
Re: Data frames select and where clause dependency
Definitely, thanks Mohammed. On Mon, Jul 20, 2015 at 5:47 PM, Mohammed Guller moham...@glassbeam.com wrote: Thanks, Harish. Mike – this would be a cleaner version for your use case: df.filter(df(filter_field) === value).select(field1).show() Mohammed *From:* Harish Butani [mailto:rhbutani.sp...@gmail.com] *Sent:* Monday, July 20, 2015 5:37 PM *To:* Mohammed Guller *Cc:* Michael Armbrust; Mike Trienis; user@spark.apache.org *Subject:* Re: Data frames select and where clause dependency Yes via: org.apache.spark.sql.catalyst.optimizer.ColumnPruning See DefaultOptimizer.batches for list of logical rewrites. You can see the optimized plan by printing: df.queryExecution.optimizedPlan On Mon, Jul 20, 2015 at 5:22 PM, Mohammed Guller moham...@glassbeam.com wrote: Michael, How would the Catalyst optimizer optimize this version? df.filter(df(filter_field) === value).select(field1).show() Would it still read all the columns in df or would it read only “filter_field” and “field1” since only two columns are used (assuming other columns from df are not used anywhere else)? Mohammed *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Friday, July 17, 2015 1:39 PM *To:* Mike Trienis *Cc:* user@spark.apache.org *Subject:* Re: Data frames select and where clause dependency Each operation on a dataframe is completely independent and doesn't know what operations happened before it. When you do a selection, you are removing other columns from the dataframe and so the filter has nothing to operate on. On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com wrote: I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine - df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); - df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following - df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
Re: Increment counter variable in RDD transformation function
Please see http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes Cheers On Mon, Jul 20, 2015 at 3:21 PM, dlmar...@comcast.net wrote: I’m trying to keep track of some information in a RDD.flatMap() function (using Java API in 1.4.0). I have two longs in the function, and I am incrementing them when appropriate, and checking their values to determine how many objects to output from the function. I’m not trying to read the values in the driver or use them as a global counter, just trying to count within the task. This appears not to be working. Should I expect this to work? If so, any pointers as to what I might be doing wrong? Code looks something like: JavaRDDLabeledPoint parsedData = data.map(new FunctionString, LabeledPoint() { Long count = 0L; public LabeledPoint call(String line) { count++; String[] parts = line.split(,); String[] features = parts[1].split( ); double[] v = new double[features.length]; for (int i = 0; i features.length - 1; i++) v[i] = Double.parseDouble(features[i]); if (count == 50) { //return something else } return new LabeledPoint(Double.parseDouble(parts[0]), Vectors.dense(v)); } } );
spark streaming 1.3 coalesce on kafkadirectstream
does spark streaming 1.3 launches task for each partition offset range whether that is 0 or not ? If yes, how can I enforce it to not to launch tasks for empty rdds.Not able t o use coalesce on directKafkaStream. Shall we enforce repartitioning always before processing direct stream ? use case is : directKafkaStream.repartition(numexecutors).mapPartitions(new FlatMapFunctionIteratorTuple2byte[],byte[], String(){ ... } Thanks
standalone to connect mysql
Hi there, I would like to use spark to access the data in mysql. So firstly I tried to run the program using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar that returns me the correct results. Then I tried the standalone version using: spark-submit --class sparkwithscala.SqlApp --driver-class-path /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077 /home/myjar.jar (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.) and the error is: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129): java.sql.SQLException: No suitable driver found for jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root I also found the similar problem before in https://jira.talendforge.org/browse/TBD-2244. Is this a bug to be fixed later? Or do I miss anything? Best regards, Jack