Re: Spark Streaming in Production
Run Spark Cluster managed my Apache Mesos. Mesos can run in high-availability mode, in which multiple Mesos masters run simultaneously. - Software Developer SigmoidAnalytics, Bangalore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-Production-tp20644p20651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Access to s3 from spark
Try Following any one : *1. Set the access key and secret key in the sparkContext:* sparkContext.set(AWS_ACCESS_KEY_ID,yourAccessKey) sparkContext.set(AWS_SECRET_ACCESS_KEY,yourSecretKey) *2. Set the access key and secret key in the environment before starting your application:* export AWS_ACCESS_KEY_ID=your access export AWS_SECRET_ACCESS_KEY=your secret * 3. Set the access key and secret key inside the hadoop configurations* val hadoopConf=sparkContext.hadoopConfiguration; hadoopConf.set(fs.s3.impl,org.apache.hadoop.fs.s3native.NativeS3FileSystem) hadoopConf.set(fs.s3.awsAccessKeyId,yourAccessKey) hadoopConf.set(fs.s3.awsSecretAccessKey,yourSecretKey) - Software Developer SigmoidAnalytics, Bangalore -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Access-to-s3-from-spark-tp20631p20654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception using amazonaws library
Its a jar conflict (http-client http://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.0-alpha4 jar), You could download the appropriate version of that jar and put that in the classpath before your assembly jar and hopefully it will avoid the conflict. Thanks Best Regards On Thu, Dec 11, 2014 at 10:13 PM, Albert Manyà alber...@eml.cc wrote: Hi, I've made a simple script in scala that after doing a spark sql query it sends the result to AWS's cloudwatch. I've tested both parts individually (the spark sql one and the cloudwatch one) and they worked fine. The trouble comes when I execute the script through spark-submit that gives me the following exception: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:95) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:193) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:120) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:104) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:171) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:152) at com.scmspain.synapse.SynapseMonitor$.sendMetrics(SynapseMonitor.scala:50) at com.scmspain.synapse.SynapseMonitor$.main(SynapseMonitor.scala:45) at com.scmspain.synapse.SynapseMonitor.main(SynapseMonitor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) my class is com.scmspain.synapse.SynapseMonitor I've build my script with sbt assembly, having the following dependencies: libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, com.amazonaws % aws-java-sdk-cloudwatch % 1.9.10 ) I've unzipped the generated jar assembly and searched for the HttpConnectionParams.class and I've found it out under org/apache/http/params and having the following signature for setSoKeepalive: public static void setSoKeepalive(HttpParams params, boolean enableKeepalive) At this point I'm stuck and didn't know where to keep looking... some help would be greatly appreciated :) Thank you very much! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Serialization issue when using HBase with Spark
The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serialization issue when using HBase with Spark
Can you paste the complete code? it looks like at some point you are passing a hadoop's configuration which is not Serializable. You can look at this thread for similar discussion http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-td13378.html Thanks Best Regards On Fri, Dec 12, 2014 at 2:05 PM, yangliuyu yangli...@163.com wrote: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Exception using amazonaws library
Hi, Thanks for your reply. I tried with the jar you pointed but It complains about missing HttpPatch that appears on httpclient 4.2 Exception in thread main java.lang.NoClassDefFoundError: org/apache/http/client/methods/HttpPatch at com.amazonaws.http.AmazonHttpClient.clinit(AmazonHttpClient.java:129) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:120) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:104) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:171) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:152) at com.scmspain.synapse.SynapseMonitor$.sendMetrics(SynapseMonitor.scala:48) at com.scmspain.synapse.SynapseMonitor$.main(SynapseMonitor.scala:28) at com.scmspain.synapse.SynapseMonitor.main(SynapseMonitor.scala) ... I do not understand who is compiled against such an old version of httpclient, I see in the project dependencies that amazonaws 1.9.10 depends on httclient 4.3... It is spark who is compiled against an old version of amazonaws? Thanks. -- Albert Manyà alber...@eml.cc On Fri, Dec 12, 2014, at 09:27 AM, Akhil Das wrote: Its a jar conflict (http-client[1] jar), You could download the appropriate version of that jar and put that in the classpath before your assembly jar and hopefully it will avoid the conflict. Thanks Best Regards On Thu, Dec 11, 2014 at 10:13 PM, Albert Manyà alber...@eml.cc wrote: Hi, I've made a simple script in scala that after doing a spark sql query it sends the result to AWS's cloudwatch. I've tested both parts individually (the spark sql one and the cloudwatch one) and they worked fine. The trouble comes when I execute the script through spark-submit that gives me the following exception: Exception in thread main java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:95) at com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:193) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:120) at com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:104) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:171) at com.amazonaws.services.cloudwatch.AmazonCloudWatchClient.init(AmazonCloudWatchClient.java:152) at com.scmspain.synapse.SynapseMonitor$.sendMetrics(SynapseMonitor.scala:50) at com.scmspain.synapse.SynapseMonitor$.main(SynapseMonitor.scala:45) at com.scmspain.synapse.SynapseMonitor.main(SynapseMonitor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) my class is com.scmspain.synapse.SynapseMonitor I've build my script with sbt assembly, having the following dependencies: libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.1.0 % provided, org.apache.spark %% spark-sql % 1.1.0 % provided, com.amazonaws % aws-java-sdk-cloudwatch % 1.9.10 ) I've unzipped the generated jar assembly and searched for the HttpConnectionParams.class and I've found it out under org/apache/http/params and having the following signature for setSoKeepalive: public static void setSoKeepalive(HttpParams params, boolean enableKeepalive) At this point I'm stuck and didn't know where to keep looking... some help would be greatly appreciated :) Thank you very much! -- Albert Manyà alber...@eml.cc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org Links: 1. http://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.0-alpha4
Re: Adding a column to a SchemaRDD
RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
[Graphx] the communication cost of leftJoin
Hi, I am trying to leftJoin an other vertice RDD(e.g vB) with this one(vA). vA.leftJoin(vB)(f) - vA is the vertices RDD in graph G, and G is edge-partitioned using EdgePartition2D. - vB is created using default partitioner(actually I am not sure...) So, I am wondering, that if vB has same partitioner to vA, what will graphx(spark) do to handle this case? for instance, as below 1) to check the partitioner of vB. 2) to do leftJoin operations, on each machine separately, for those co-located partitions of vA and vB. right? But, if vB’s partitioner is different, what will happen? how they communicate between partitions(and machines)? Anyone has some points on this, or communication between RDDs? Thanks, :) Best, Yifan LI
Spark CDH5 packages
Hi, I'm new to this list, so please excuse if I'm asking simple questions. We are experimenting spark deployment on existing CDH clusters. However the spark package come with CDH are very out of date (v1.0.0). Has anyone had experience with custom Spark upgrade for CDH5? Any installation or packaging recommendation would be appreciate. The download page and documentation site only mention about CDH4 pre build package. Thanks Jing - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark CDH5 packages
No, CDH 5.2 includes Spark 1.1 actually, which is the latest released minor version; 5.3 will include 1.2, which not released yet. You can make a build of just about any version of Spark for CDH5, and manually install it yourself, sure, but easier would be to just update CDH. The instructions are the ones you've found; all you need to know is to use the hadoop-2.4 profile (which is for 2.5 and 2.6 too) and set hadoop.version appropriately. On Fri, Dec 12, 2014 at 12:26 PM, Jing Dong j...@qubitdigital.com wrote: Hi, I'm new to this list, so please excuse if I'm asking simple questions. We are experimenting spark deployment on existing CDH clusters. However the spark package come with CDH are very out of date (v1.0.0). Has anyone had experience with custom Spark upgrade for CDH5? Any installation or packaging recommendation would be appreciate. The download page and documentation site only mention about CDH4 pre build package. Thanks Jing - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Read data from SparkStreaming from Java socket.
Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
...FileNotFoundException: Path is not a file: - error on accessing HDFS with sc.wholeTextFiles
When I try to load a text file from a HDFS path using sc.wholeTextFiles(hdfs://localhost:54310/graphx/anywebsite.com/anywebsite.com/) I'm get the following error: java.io.FileNotFoundException: Path is not a file: /graphx/anywebsite.com/anywebsite.com/css (full stack trace at bottom of message). If I switch my Scala code to reading the input file from the local disk, wholeTextFiles doesn't pickup directories (such as css in this case) and there is no exception raised. The trace information in the 'local file' version shows that only plain text files are collected with sc.wholeTextFiles: 14/12/12 11:51:29 INFO WholeTextFileRDD: Input split: Paths:/tmp/anywebsite.com/anywebsite.com/index-2.html:0+6192,/tmp/anywebsite.com/anywebsite.com/gallery.html:0+3258,/tmp/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/tmp/anywebsite.com/anywebsite.com/jquery.html:0+326,/tmp/anywebsite.com/anywebsite.com/index.html:0+6174,/tmp/anywebsite.com/anywebsite.com/contact.html:0+3050,/tmp/anywebsite.com/anywebsite.com/archive.html:0+3247 Yet the trace information in the 'HDFS file' version shows directories too are collected with sc.wholeTextFiles: 14/12/12 11:49:07 INFO WholeTextFileRDD: Input split: Paths:/graphx/anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 14/12/12 11:49:07 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: Path is not a file: /graphx/anywebsite.com/anywebsite.com/css at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) Should the HDFS version behave the same as the local version of wholeTextFiles as far as the treatment of directories/non plain text files are concerned ? Any help, advice or workaround suggestions would be much appreciated, Thanks Karen VERSION INFO Ubuntu 14.04 Spark 1.1.1 Hadoop 2.5.2 Scala 2.10.4 FULL STACK TRACE 14/12/12 12:02:31 INFO WholeTextFileRDD: Input split: Paths:/graphx/anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 14/12/12 12:02:31 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: Path is not a file: /graphx/anywebsite.com/anywebsite.com/css at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at
Unit testing and Spark Streaming
Hi, I’ve started my first experiments with Spark Streaming and started with setting up an environment using ScalaTest to do unit testing. Poked around on this mailing list and googled the topic. One of the things I wanted to be able to do is to use Scala Sequences as data source in the tests (instead of using files for example). For this, queueStream on a StreamingContext came in handy. I now have a setup that allows me to run WordSpec style tests like in: class StreamTests extends StreamingContextBaseSpec(Some-tests) with Matchers with WordsCountsTestData { Running word count should { produce the correct word counts for a non-empty list of words in { val streamingData = injectData(data1) val wordCountsStream = WordCounter.wordCounter(streamingData) val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc) val sliceSet = wordCounts.toSet wordCounts.toSet shouldBe wordCounts1 } return count = 1 for the empty string in { val streamingData: InputDStream[String] = injectData(data2) val wordCountsStream: DStream[(String, Int)] = WordCounter.wordCounter(streamingData) val wordCounts: Seq[(String, Int)] = startStreamAndExtractResult(wordCountsStream, ssc) wordCounts.toSet shouldBe wordCounts2 } return an empty result for an empty list of words in { val streamingData = injectData(data3) val wordCountsStream = WordCounter.wordCounter(streamingData) val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc) wordCounts.toSet shouldBe wordCounts3 } } Running word count with filtering out words with single occurrence should { produce the correct word counts for a non-empty list of words in { val streamingData = injectData(data1) val wordCountsStream = WordCounter.wordCountOverOne(streamingData) val wordCounts = startStreamAndExtractResult(wordCountsStream, ssc) wordCounts.toSet shouldBe wordCounts1.filter(_._2 1) } } } where WordsCountsTestData (added at the end of this message) is a trait that contains the test data and the correct results. The two methods under test in the above test code (WordCounter.wordCounter and WordCounter.wordCountOverOne) are: object WordCounter { def wordCounter(input: InputDStream[String]): DStream[(String, Int)] = { val pairs = input.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts } def wordCountOverOne(input: InputDStream[String]): DStream[(String, Int)] = { val pairs = input.map(word = (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts filter (_._2 1) } } StreamingContextBaseSpec contains the actual test helper methods such as injectData and startStreamAndExtractResult. package spark.testing import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} import org.scalatest.{BeforeAndAfter, WordSpec} import scala.collection.mutable.Queue import scala.reflect.ClassTag class StreamingContextBaseSpec(name: String, silenceSpark : Boolean = true) extends WordSpec with BeforeAndAfter { val BatchDuration = 10 // milliseconds val DeltaTBefore = 20 * BatchDuration val DeltaTAfter = 10 * BatchDuration def injectData[T: ClassTag](data: Seq[T]): InputDStream[T] = { val dataAsRDD = ssc.sparkContext.parallelize(data) val dataAsRDDOnQueue = Queue(dataAsRDD) ssc.queueStream(dataAsRDDOnQueue, oneAtATime = false) } def startStreamAndExtractResult[T: ClassTag](stream: DStream[T], ssc: StreamingContext): Seq[T] = { stream.print() println(s~~~ starting execution context $ssc) val sTime = System.currentTimeMillis() ssc.start() val startWindow = new Time(sTime - DeltaTBefore) val endWindow = new Time(sTime + DeltaTAfter) val sliceRDDs = stream.slice(startWindow, endWindow) sliceRDDs.map(rdd = rdd.collect()).flatMap(data = data.toVector) } var ssc: StreamingContext = _ before { System.clearProperty(spark.driver.port) System.clearProperty(spark.driver.host) if ( silenceSpark ) SparkUtil.silenceSpark() val conf = new SparkConf().setMaster(local).setAppName(name) ssc = new StreamingContext(conf, Milliseconds(BatchDuration)) } after { println(s~~~ stopping execution context $ssc) System.clearProperty(spark.driver.port) System.clearProperty(spark.driver.host) ssc.stop(stopSparkContext = true, stopGracefully = true) ssc.awaitTermination() ssc = null } } So far for the prelude, now my questions: Is this a good way to perform this kind of testing ? Are there more efficient ways to run this kind of testing ? To reduce the test run time, I’m running the stream with a batch interval of only 10ms and a window that extends to 100ms (This seems to work fine as far as I can see. When the batch interval is
Re: Read data from SparkStreaming from Java socket.
I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org SocketBenchmark.jar Description: application/java-archive - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ...FileNotFoundException: Path is not a file: - error on accessing HDFS with sc.wholeTextFiles
I'm not quiet sure whether spark will go inside subdirectories and pick up files from it. You could do something like following to bring all files to one directory. find . -iname '*' -exec mv '{}' . \; Thanks Best Regards On Fri, Dec 12, 2014 at 6:34 PM, Karen Murphy k.l.mur...@qub.ac.uk wrote: When I try to load a text file from a HDFS path using sc.wholeTextFiles(hdfs://localhost:54310/graphx/ anywebsite.com/anywebsite.com/) I'm get the following error: java.io.FileNotFoundException: Path is not a file: /graphx/ anywebsite.com/anywebsite.com/css (full stack trace at bottom of message). If I switch my Scala code to reading the input file from the local disk, wholeTextFiles doesn't pickup directories (such as css in this case) and there is no exception raised. The trace information in the 'local file' version shows that only plain text files are collected with sc.wholeTextFiles: 14/12/12 11:51:29 INFO WholeTextFileRDD: Input split: Paths:/tmp/ anywebsite.com/anywebsite.com/index-2.html:0+6192,/tmp/anywebsite.com/anywebsite.com/gallery.html:0+3258,/tmp/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/tmp/anywebsite.com/anywebsite.com/jquery.html:0+326,/tmp/anywebsite.com/anywebsite.com/index.html:0+6174,/tmp/anywebsite.com/anywebsite.com/contact.html:0+3050,/tmp/anywebsite.com/anywebsite.com/archive.html:0+3247 Yet the trace information in the 'HDFS file' version shows directories too are collected with sc.wholeTextFiles: 14/12/12 11:49:07 INFO WholeTextFileRDD: Input split: Paths:/graphx/ anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 14/12/12 11:49:07 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: Path is not a file: /graphx/ anywebsite.com/anywebsite.com/css at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) Should the HDFS version behave the same as the local version of wholeTextFiles as far as the treatment of directories/non plain text files are concerned ? Any help, advice or workaround suggestions would be much appreciated, Thanks Karen VERSION INFO Ubuntu 14.04 Spark 1.1.1 Hadoop 2.5.2 Scala 2.10.4 FULL STACK TRACE 14/12/12 12:02:31 INFO WholeTextFileRDD: Input split: Paths:/graphx/ anywebsite.com/anywebsite.com/archive.html:0+3247,/graphx/anywebsite.com/anywebsite.com/contact.html:0+3050,/graphx/anywebsite.com/anywebsite.com/css:0+0,/graphx/anywebsite.com/anywebsite.com/exhibitions.html:0+6663,/graphx/anywebsite.com/anywebsite.com/gallery.html:0+3258,/graphx/anywebsite.com/anywebsite.com/highslide:0+0,/graphx/anywebsite.com/anywebsite.com/highslideIndex:0+0,/graphx/anywebsite.com/anywebsite.com/images:0+0,/graphx/anywebsite.com/anywebsite.com/index-2.html:0+6192,/graphx/anywebsite.com/anywebsite.com/index.html:0+6174,/graphx/anywebsite.com/anywebsite.com/jquery.html:0+326,/graphx/anywebsite.com/anywebsite.com/js:0+0 14/12/12 12:02:31 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: Path is not a file: /graphx/ anywebsite.com/anywebsite.com/css at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at
Re: Read data from SparkStreaming from Java socket.
I dont' understand what spark streaming socketTextStream is waiting... is it like a server so you just have to send data from a client?? or what's it excepting? 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
socketTextStream is Socket client which will read from a TCP ServerSocket. Thanks Best Regards On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I dont' understand what spark streaming socketTextStream is waiting... is it like a server so you just have to send data from a client?? or what's it excepting? 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: why is spark + scala code so slow, compared to python?
Try this https://github.com/RetailRocket/SparkMultiTool https://github.com/RetailRocket/SparkMultiTool This loader solved slow reading of a big data set of small files in hdfs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/why-is-spark-scala-code-so-slow-compared-to-python-tp20636p20657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.1, Hadoop 2.6 - Protobuf conflict
I had this problem also with spark 1.1.1. At the time I was using hadoop 0.20. To get around it I installed hadoop 2.5.2, and set the protobuf.version to 2.5.0 in the build command like so: mvn -Phadoop-2.5 -Dhadoop.version=2.5.2 -Dprotobuf.version=2.5.0 -DskipTests clean package So I changed spark's pom.xml to read the protobuf.version from the command line. If I didn't explicitly set protobuf.version it was picking up an older version that existed on my filesystem somewhere, Karen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-1-Hadoop-2-6-Protobuf-conflict-tp20656p20658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.1.1, Hadoop 2.6 - Protobuf conflict
There is no hadoop-2.5 profile. You can use hadoop-2.4 for 2.4+. This profile already sets protobuf.version to 2.5.0 for this reason. It is already something you can set on the command line as it is read as a Maven build property. It does not pick up an older version because it's somewhere on your system; it uses what is appropriate for your Hadoop version. No changes needed, you just need to follow http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html and set one profile and hadoop version. On Fri, Dec 12, 2014 at 2:08 PM, kmurph k.l.mur...@qub.ac.uk wrote: I had this problem also with spark 1.1.1. At the time I was using hadoop 0.20. To get around it I installed hadoop 2.5.2, and set the protobuf.version to 2.5.0 in the build command like so: mvn -Phadoop-2.5 -Dhadoop.version=2.5.2 -Dprotobuf.version=2.5.0 -DskipTests clean package So I changed spark's pom.xml to read the protobuf.version from the command line. If I didn't explicitly set protobuf.version it was picking up an older version that existed on my filesystem somewhere, Karen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-1-Hadoop-2-6-Protobuf-conflict-tp20656p20658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unit testing and Spark Streaming
On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote: How can the log level in test mode be reduced (or extended when needed) ? Hello Eric, The following might be helpful for reducing the log messages during unit testing: http://stackoverflow.com/a/2736/236007 -- Emre Sevinç https://be.linkedin.com/in/emresevinc
Re: Including data nucleus tools
Hi, I had time to try it again. I submited my app by the same command with these additional options: --jars lib/datanucleus-api-jdo-3.2.6.jar,lib/datanucleus-core-3.2.10.jar, lib/datanucleus-rdbms-3.2.9.jar Now an app successfully creates hive context. So my question remains: Is classpath entries from sparkUI the same classpath as mentioned in submit script message? Spark assembly has been built with Hive, including Datanucleus jars on classpath If so then why the script fails to really include datanucleus jars on classpath? I found no bug about this on jira. Or is there a way how particular yarn/os settings on our cluster overrides this? Thanks in advance Jakub -- Původní zpráva -- Od: spark.dubovsky.ja...@seznam.cz Komu: Michael Armbrust mich...@databricks.com Datum: 7. 12. 2014 3:02:33 Předmět: Re: Including data nucleus tools Next try. I copied whole dist directory created by make-distribution script to cluster not just assembly jar. Then I used ./bin/spark-submit --num-executors 200 --master yarn-cluster --class org. apache.spark.mllib.CreateGuidDomainDictionary ../spark/root-0.1.jar ${args} ...to run app again. Startup scripts printed this message: Spark assembly has been built with Hive, including Datanucleus jars on classpath ...so I thought I am finally there. But job started and failed on the same ClassNotFound exception as before. Is classpath from script message just classpath of driver? Or is it the same classpath which is affected by --jars option? I was trying to find out from scripts but I was not able to find where --jars option is processed. thanks -- Původní zpráva -- Od: Michael Armbrust mich...@databricks.com Komu: spark.dubovsky.ja...@seznam.cz Datum: 6. 12. 2014 20:39:13 Předmět: Re: Including data nucleus tools On Sat, Dec 6, 2014 at 5:53 AM, spark.dubovsky.ja...@seznam.cz (mailto:/skin/default/img/empty.gif) wrote: Bonus question: Should the class org.datanucleus.api.jdo. JDOPersistenceManagerFactory be part of assembly? Because it is not in jar now. No these jars cannot be put into the assembly because they have extra metadata files that live in the same location (so if you put them all in an assembly they overrwrite each other). This metadata is used in discovery. Instead they must be manually put on the classpath in their original form (usually using --jars).
Re: Spark CDH5 packages
Hi Sowen, Thanks for the tip. When will CDH 5.3 be released? Some sort of timeline would be helpful. Thanks, Jing On 12 Dec 2014, at 12:34, Sean Owen so...@cloudera.com wrote: No, CDH 5.2 includes Spark 1.1 actually, which is the latest released minor version; 5.3 will include 1.2, which not released yet. You can make a build of just about any version of Spark for CDH5, and manually install it yourself, sure, but easier would be to just update CDH. The instructions are the ones you've found; all you need to know is to use the hadoop-2.4 profile (which is for 2.5 and 2.6 too) and set hadoop.version appropriately. On Fri, Dec 12, 2014 at 12:26 PM, Jing Dong j...@qubitdigital.com wrote: Hi, I'm new to this list, so please excuse if I'm asking simple questions. We are experimenting spark deployment on existing CDH clusters. However the spark package come with CDH are very out of date (v1.0.0). Has anyone had experience with custom Spark upgrade for CDH5? Any installation or packaging recommendation would be appreciate. The download page and documentation site only mention about CDH4 pre build package. Thanks Jing - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming: missing classes when kafka consumer classes
Hi, I asked on SO and got an answer about this http://stackoverflow.com/questions/27444512/missing-classes-from-the-assembly-file-created-by-sbt-assembly . Adding fullClasspath in assembly := (fullClasspath in Compile).value at the end of my builld.sbt solved the problem, apparently. Best, Mario On 11.12.2014 20:04, Flávio Santos wrote: Hi Mario, Try to include this to your libraryDependencies (in your sbt file): org.apache.kafka % kafka_2.10 % 0.8.0 exclude(javax.jms, jms) exclude(com.sun.jdmk, jmxtools) exclude(com.sun.jmx, jmxri) exclude(org.slf4j, slf4j-simple) Regards, *-- Flávio R. Santos* Chaordic | /Platform/ _www.chaordic.com.br http://www.chaordic.com.br/_ +55 48 3232.3200 On Thu, Dec 11, 2014 at 12:32 PM, Mario Pastorelli mario.pastore...@teralytics.ch mailto:mario.pastore...@teralytics.ch wrote: Thanks akhil for the answer. I am using sbt assembly and the build.sbt is in the first email. Do you know why those classes are included in that way? Thanks, Mario On 11.12.2014 14:51, Akhil Das wrote: Yes. You can do/use *sbt assembly* and create a big fat jar with all dependencies bundled inside it. Thanks Best Regards On Thu, Dec 11, 2014 at 7:10 PM, Mario Pastorelli mario.pastore...@teralytics.ch mailto:mario.pastore...@teralytics.ch wrote: In this way it works but it's not portable and the idea of having a fat jar is to avoid exactly this. Is there any system to create a self-contained portable fatJar? On 11.12.2014 13:57, Akhil Das wrote: Add these jars while creating the Context. val sc = new SparkContext(conf) sc.addJar(/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/*spark-streaming-kafka_2.10-1.1.0.jar*) sc.addJar(/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/*zkclient-0.3.jar*) sc.addJar(/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/*metrics-core-2.2.0.jar*) sc.addJar(/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/*kafka_2.10-0.8.0.jar*) val ssc = new StreamingContext(sc, Seconds(10)) Thanks Best Regards On Thu, Dec 11, 2014 at 6:22 PM, Mario Pastorelli mario.pastore...@teralytics.ch mailto:mario.pastore...@teralytics.ch wrote: Hi, I'm trying to use spark-streaming with kafka but I get a strange error on class that are missing. I would like to ask if my way to build the fat jar is correct or no. My program is val kafkaStream = KafkaUtils.createStream(ssc, zookeeperQuorum, kafkaGroupId, kafkaTopicsWithThreads) .map(_._2) kafkaStream.foreachRDD((rdd,t) = rdd.foreachPartition { iter:Iterator[CellWithLAC] = println(time: ++ t.toString ++ #received: ++ iter.size.toString) }) I use sbt to manage my project and my build.sbt (with assembly 0.12.0 plugin) is name := spark_example version := 0.0.1 scalaVersion := 2.10.4 scalacOptions ++= Seq(-deprecation,-feature) libraryDependencies ++= Seq( org.apache.spark % spark-streaming_2.10 % 1.1.1, org.apache.spark % spark-streaming-kafka_2.10 % 1.1.1, joda-time % joda-time % 2.6 ) assemblyMergeStrategy in assembly := { case p if p startsWith com/esotericsoftware/minlog = MergeStrategy.first case p if p startsWith org/apache/commons/beanutils = MergeStrategy.first case p if p startsWith org/apache/ = MergeStrategy.last case plugin.properties = MergeStrategy.discard case p if p startsWith META-INF = MergeStrategy.discard case x = val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } I create the jar with sbt assembly and the run with $SPARK_HOME/bin/spark-submit --master spark://master:7077 --class Main target/scala-2.10/spark_example-assembly-0.0.1.jar localhost:2181 test-consumer-group test1 where master:7077 is the spark master, localhost:2181 is zookeeper, test-consumer-group is kafka groupid and test1 is the kafka topic. The program starts and keep running but I get an error and nothing is printed. In the log I found the following stack trace: 14/12/11 13:02:08 INFO network.ConnectionManager: Accepted connection from [10.0.3.1/10.0.3.1:54325 http://10.0.3.1/10.0.3.1:54325] 14/12/11 13:02:08 INFO network.SendingConnection: Initiating
Cannot pickle DecisionTreeModel in the pyspark
Hi everyone, I am trying to save the decision tree model in python and I use pickle.dump() to do this. However, it returns the following error information: /cPickle.UnpickleableError: Cannot pickle type 'thread.lock' objects/ I did some tests on the other model. It seems that decision tree model is the only model in pyspark that we cannot pickle. FYI: I use spark 1.1.1 Do you have any idea to solve this problem?(I dont know whether using scala can solve this problem or not.) Thanks a lot in advance for your help. Cheers Gen -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-pickle-DecisionTreeModel-in-the-pyspark-tp20661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Session for connections?
Looks like the way to go. Quick question regarding the connection pool approach - if I have a connection that gets lazily instantiated, will it automatically die if I kill the driver application? In my scenario, I can keep a connection open for the duration of the app, and aren't that concerned about having idle connections as long as the app is running. For this specific scenario, do I still need to think of the timeout, or would it be shut down when the driver stops? (Using a stand alone cluster btw). Regards, Ashic. From: tathagata.das1...@gmail.com Date: Thu, 11 Dec 2014 06:33:49 -0800 Subject: Re: Session for connections? To: as...@live.com CC: user@spark.apache.org Also, this is covered in the streaming programming guide in bits and pieces. http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd On Thu, Dec 11, 2014 at 4:55 AM, Ashic Mahtab as...@live.com wrote: That makes sense. I'll try that. Thanks :) From: tathagata.das1...@gmail.com Date: Thu, 11 Dec 2014 04:53:01 -0800 Subject: Re: Session for connections? To: as...@live.com CC: user@spark.apache.org You could create a lazily initialized singleton factory and connection pool. Whenever an executor starts running the firt task that needs to push out data, it will create the connection pool as a singleton. And subsequent tasks running on the executor is going to use the connection pool. You will also have to intelligently shutdown the connections because there is not a obvious way to shut them down. You could have a usage timeout - shutdown connection after not being used for 10 x batch interval. TD On Thu, Dec 11, 2014 at 4:28 AM, Ashic Mahtab as...@live.com wrote: Hi, I was wondering if there's any way of having long running session type behaviour in spark. For example, let's say we're using Spark Streaming to listen to a stream of events. Upon receiving an event, we process it, and if certain conditions are met, we wish to send a message to rabbitmq. Now, rabbit clients have the concept of a connection factory, from which you create a connection, from which you create a channel. You use the channel to get a queue, and finally the queue is what you publish messages on. Currently, what I'm doing can be summarised as : dstream.foreachRDD(x = x.forEachPartition(y = { val factory = .. val connection = ... val channel = ... val queue = channel.declareQueue(...); y.foreach(z = Processor.Process(z, queue)); cleanup the queue stuff. })); I'm doing the same thing for using Cassandra, etc. Now in these cases, the session initiation is expensive, so foing it per message is not a good idea. However, I can't find a way to say hey...do this per worker once and only once. Is there a better pattern to do this? Regards, Ashic. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming in Production
Thanks for the reply. I might be misunderstanding something basic.As far as I can tell, the cluster manager (e.g. Mesos) manages the master and worker nodes but not the drivers or receivers, those are external to the spark cluster: http://spark.apache.org/docs/latest/cluster-overview.html I know that the spark-submit script has a --deploy-mode cluster option. Does this mean that the receiver will be managed on the cluster? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-Production-tp20644p20662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming in Production
IIUC, Receivers run on workers, colocated with other tasks. The Driver, on the other hand, can either run on the querying machine (local mode) or as a worker (cluster mode). — FG On Fri, Dec 12, 2014 at 4:49 PM, twizansk twiza...@gmail.com wrote: Thanks for the reply. I might be misunderstanding something basic.As far as I can tell, the cluster manager (e.g. Mesos) manages the master and worker nodes but not the drivers or receivers, those are external to the spark cluster: http://spark.apache.org/docs/latest/cluster-overview.html I know that the spark-submit script has a --deploy-mode cluster option. Does this mean that the receiver will be managed on the cluster? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-in-Production-tp20644p20662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Adding a column to a SchemaRDD
(1) I understand about immutability, that's why I said I wanted a new SchemaRDD. (2) I specfically asked for a non-SQL solution that takes a SchemaRDD, and results in a new SchemaRDD with one new function. (3) The DSL stuff is a big clue, but I can't find adequate documentation for it What I'm looking for is something like: import org.apache.spark.sql._ val sqlc = new SQLContext(sc) import sqlc._ val data = sc.parallelize(0 to 99).map(n = ({\seven\: {\mod\: %d, \times\: %d}, + \eleven\: {\mod\: %d, \times\: %d}}).format(n % 7, n * 7, n % 11, n * 11)) val jdata = sqlc.jsonRDD(data) jdata.registerTempTable(jdata) val sqlVersion = sqlc.sql(SELECT *, (seven.mod + eleven.mod) AS modsum FROM jdata) This sqlVersion works fine, but if I try to do the same thing with a programatic function, I'm missing a bunch of pieces: - I assume I'd need to start with something like: jdata.select('*, 'seven.mod, 'eleven.mod) and then get and process the last two elements. The problems are: - I can't select '* - there seems no way to get the complete row - I can't select 'seven.mod or 'eleven.mod - the symbol evaluation seems only one deep. - Assuming I could do that, I don't see a way to make the result into a SchemaRDD. I assume I would have to do something like: 1. take my row and value, and create a new, slightly longer row 2. take my old schema, and create a new schema with one more field at the end, named and typed appropriately 3. combine the two into a SchemaRDD I think I see how to do 3, but 1 and 2 elude me. Is there more complete documentation somewhere for the DSL portion? Anyone have a clue about any of the above? On Fri, Dec 12, 2014 at 6:01 AM, Yanbo Liang yanboha...@gmail.com wrote: RDD is immutable so you can not modify it. If you want to modify some value or schema in RDD, using map to generate a new RDD. The following code for your reference: def add(a:Int,b:Int):Int = { a + b } val d1 = sc.parallelize(1 to 10).map { i = (i, i+1, i+2) } val d2 = d1.map { i = (i._1, i._2, add(i._1, i._2))} d2.foreach(println) Otherwise, if your self-defining function is straightforward and you can represent it by SQL, using Spark SQL or DSL is also a good choice. case class Person(id: Int, score: Int, value: Int) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val d1 = sc.parallelize(1 to 10).map { i = Person(i,i+1,i+2)} val d2 = d1.select('id, 'score, 'id + 'score) d2.foreach(println) 2014-12-12 14:11 GMT+08:00 Nathan Kronenfeld nkronenf...@oculusinfo.com: Hi, there. I'm trying to understand how to augment data in a SchemaRDD. I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. Is this possible? If so, how? Thanks, -Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Unit testing and Spark Streaming
https://github.com/jayunit100/SparkStreamingCassandraDemo On this note, I've built a framework which is mostly pure so that functional unit tests can be run composing mock data for Twitter statuses, with just regular junit... That might be relevant also. I think at some point we should come up with a robust test driven framework for building stream apps... And the idea of Scala test with the injection and comparison you did might be a good start. Thanks for starting this dialogue! On Dec 12, 2014, at 9:18 AM, Emre Sevinc emre.sev...@gmail.com wrote: On Fri, Dec 12, 2014 at 2:17 PM, Eric Loots eric.lo...@gmail.com wrote: How can the log level in test mode be reduced (or extended when needed) ? Hello Eric, The following might be helpful for reducing the log messages during unit testing: http://stackoverflow.com/a/2736/236007 -- Emre Sevinç https://be.linkedin.com/in/emresevinc
Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via LeastSquareGradient() correctly? Thanks Tri
RDD lineage and broadcast variables
I'm still wrapping my head around that fact that the data backing an RDD is immutable since an RDD may need to be reconstructed from its lineage at any point. In the context of clustering there are many iterations where an RDD may need to change (for instance cluster assignments, etc) based on a broadcast variable of a list of centroids which are objects that in turn contain a list of features. So immutability is all well and good for the purposes of being able to replay a lineage. But now I'm wondering, during each iterations in which this RDD goes through many transformations it will be transforming based on that broadcast variable of centroids that are mutable. How would it replay the lineage in this instance? Does a dependency on mutable variables mess up the whole lineage thing? Any help appreciated. Just trying to wrap my head around using Spark correctly. I will say it does seem like there is a common miss conception that Spark RDDs are in-memory arrays - but perhaps this is for a reason. Perhaps in some cases an option for mutability and failure exception is exactly what is needed for a one off algorithm that doesn't necessarily need resiliency. Just a thought.
Passing Spark Configuration from Driver (Master) to all of the Slave nodes
Hi to all, Our problem was passing configuration from Spark Driver to the Slaves. After a lot of time spent figuring out how things work, this is the solution I came up with. Hope this will be helpful for others as well. You can read about it in my Blog Post http://progexc.blogspot.co.il/2014/12/spark-configuration-mess-solved.html -- Enjoy, Demi Ben-Ari Senior Software Engineer Windward LTD.
Re: Spark Server - How to implement
Thanks Marcelo. Spark Gurus/Databricks team - do you have something in roadmap for such a spark server ? Thanks, On Thu, Dec 11, 2014 at 5:43 PM, Marcelo Vanzin van...@cloudera.com wrote: Oops, sorry, fat fingers. We've been playing with something like that inside Hive: https://github.com/apache/hive/tree/spark/spark-client That seems to have at least a few of the characteristics you're looking for; but it's a very young project, and at this moment we're not developing it as a public API, but mostly for internal Hive use. It can give you a few ideas, though. Also, SPARK-3215. On Thu, Dec 11, 2014 at 5:41 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Manoj, I'm not aware of any public projects that do something like that, except for the Ooyala server which you say doesn't cover your needs. We've been playing with something like that inside Hive, though: On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, If spark based services are to be exposed as a continuously available server, what are the options? * The API exposed to client will be proprietary and fine grained (RPC style ..), not a Job level API * The client API need not be SQL so the Thrift JDBC server does not seem to be option .. but I could be wrong here ... * Ooyala implementation is a REST API for job submission, but as mentioned above; the desired API is a finer grain API, not a job submission Any existing implementation? Is it build your own server? Any thoughts on approach to use ? Thanks, -- Marcelo -- Marcelo
How to get driver id?
Hi Guys: I want to kill an application but I could not find the driver id of the application from web ui. Is there any way to get it from command line? Thanks -- Sincerely Yours Xingwei Yang https://sites.google.com/site/xingweiyang1223/
Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
Thanks for the confirmation. Fyi..The code below works for similar dataset, but with the feature magnitude changed, LBFGS converged to the right weights. Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the error while sequential feature 14111, 14112, 14113,14115 would converge to the right weight. Why? Below is code to implement standardscaler() for sample data (10246.0,[14111.0,1.0])): val scaler1 = new StandardScaler().fit(train.map(x = x.features)) val train1 = train.map(x = (x.label, scaler1.transform(x.features))) But I keeps on getting error: value features is not a member of (Double, org.apache.spark.mllib.linalg.Vector) Should my feature vector be .toInt instead of Double? Also, the error org.apache.spark.mllib.linalg.Vector should have an s to match import library org.apache.spark.mllib.linalg.Vectors Thanks Tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 12:16 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
resource allocation spark on yarn
Hi All, I have spark on yarn and there are multiple spark jobs on the cluster. Sometimes some jobs are not getting enough resources even when there are enough free resources available on cluster, even when I use below settings --num-workers 75 \ --worker-cores 16 Jobs stick with the resources what they get when job started. Do we need to look at any other configs ? can some one give pointers on this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/resource-allocation-spark-on-yarn-tp20664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX for large scale PageRank (~4 billion nodes, ~128 billion edges)
Hi! tldr; We're looking at potentially using Spark+GraphX to compute PageRank over a 4 billion node + 128 billion edge graph on a regular (monthly) basis, possibly growing larger in size over time. If anyone has hints / tips / upcoming optimizations I should test out (or wants to contribute -- we'll pay the EC2 credits!) for running large scale graph processing with Spark+GraphX on EC2+S3, I'd love to hear it! =] First, I must say, I'm quite excited for the rise in the Spark ecosystem -- Spark makes life so much easier -- and it's for that very reason I'm looking to use it for some of our processing work at the tech non-profit CommonCrawl http://commoncrawl.org/. To improve our crawl, we're aiming to run PageRank monthly on our crawl archives. As we can run the whole pipeline in Spark+GraphX, it's really quite a tempting proposition for us. For that reason, I've been looking at replicating the existing experiments from the GraphX: Unifying Data-Parallel and Graph-Parallel Analytics http://arxiv.org/abs/1402.2394 paper to make sure my general setup and experimental methodology are sound before attempting to scale up to the larger dataset. One issue is that the paper states the hardware, but not the convergence tolerance or number of iterations for PageRank. I've read a separate figure in a Spark presentation http://www.graphanalysis.org/IPDPS2014-workshop/Gonzales.pdf that reports 68 seconds for 10 iterations, but no clue if 10 iterations are comparable. I'm using the Spark EC2 spin up scripts, and other than some minor issues such as Ganglia failing[1], getting a cluster running has been positive and smooth sailing. *Hardware:* All experiments were done with either 16 m2.4xlarge nodes or 32 r3.xlarge machines. That has comparable RAM and CPU to the machines used in the paper whilst also having SSD instead of magnetic disk. I've found more machines can work better for downloading large amounts of data from S3, where our target dataset will be stored. *Experiments (30 iterations of PageRank, taking away data loading time)* *LiveJournal:*Total runtime: [4:19|4:32|5:16|...] Loading: [1:36|1:48|...] Per iteration: 6 seconds *Experiments (Twitter - 41 million nodes, 1.4 billion edges):* Total runtime: 28 minutes Loading: 4.3 minutes Per iteration: 47 seconds (Special note: with memory+disk serialization and compressed RDD [snappy], it's ~100 seconds per iteration (54 minutes in total with 4.5 minutes loading)) *Experiments (WebDataCommons PLD -- 43 million nodes, 623 million edges):* This should be on a similar scale to the Twitter graph (41 million nodes, 1.4 billion edges) and it's a smaller representation of what I'm aiming to tackle. Data freely available here (2.7GB): http://webdatacommons.org/hyperlinkgraph/#toc2 Total runtime: 45 minutes Loading: 25 minutes (silly single file -- didn't split it beforehand plus format was gzip and not bzip2 so wasn't splittable) Per iteration: 40 seconds *20 / 697 of WebDataCommons Page Graph (4 billion nodes, 128 billion edges)* Data freely available here (331GB for the full dataset): http://webdatacommons.org/hyperlinkgraph/#toc2 Vertices: 157155245, Edges: 3611537984 Consistent java.lang.Long cannot be cast to scala.Tuple2 class cast exceptions [2] produced by the SortShuffleWriter / ExternalSorter but it's slowly getting done - primarily as some of the partitions don't seem to need the external sort or don't trigger the error. Potentially due to using SORT instead of HASH, but I've not tested that yet. Per iteration: 5-6 minutes (didn't complete the full job) *Both 50 / 697 and 100 / 697 experiment (failed)* 50 / 697 has Vertices: 326314372, Edges: 9072089327 100 / 697 couldn't report the graph size Java heap space exceptions everywhere :P OpenHashSet in EdgePartitionBuilder https://github.com/amplab/graphx/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala#L55 seems exceeded -- possibly as there could still be millions or even billions of unique nodes in a single partition. *Code:* I've put the code up on GitHub under the terrible name graphx-prank https://github.com/Smerity/graphx-prank. The given code allows me to run large graphs locally on my laptop for testing, whereas the GraphX PageRank example tends to fail at the memory intensive parts. I can run LiveJournal on my underpowered laptop using my code, for example, but not using the GraphX PageRank example. The aim is for the code to be used as part of a pipeline at Common Crawl for extracting a hyperlink graph, computing PageRank over it, then storing the results to determine what we crawl next. *General questions and comments:* (a) Is this size of graph sane to use with GraphX yet with 50 m2.4xlarge nodes or 100 r3.xlarge machines? I know optimizations for insanely large graphs are coming in but I'm potentially a little early? (b) Any improvements / optimizations for running Spark and GraphX at this scale? (c) Are there any other example scripts in the
how to convert an rdd to a single output file
I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) {writer.appendHeader(out, getApplication()); IteratorIScoredScan scanIterator = scans.toLocalIterator(); while(scanIterator.hasNext()) {IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}*
Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
It seems that your response is not scaled which will cause issue in LBFGS. Typically, people train Linear Regression with zero-mean/unit-variable feature and response without training the intercept. Since the response is zero-mean, the intercept will be always zero. When you convert the coefficients to the oringal space from the scaled space, the intercept can be computed by w0 = y - \sum x_n w_n where x_n is the average of column n. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 10:49 AM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the confirmation. Fyi..The code below works for similar dataset, but with the feature magnitude changed, LBFGS converged to the right weights. Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the error while sequential feature 14111, 14112, 14113,14115 would converge to the right weight. Why? Below is code to implement standardscaler() for sample data (10246.0,[14111.0,1.0])): val scaler1 = new StandardScaler().fit(train.map(x = x.features)) val train1 = train.map(x = (x.label, scaler1.transform(x.features))) But I keeps on getting error: value features is not a member of (Double, org.apache.spark.mllib.linalg.Vector) Should my feature vector be .toInt instead of Double? Also, the error org.apache.spark.mllib.linalg.Vector should have an s to match import library org.apache.spark.mllib.linalg.Vectors Thanks Tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 12:16 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.2 + Avro file does not work in HDP2.2
Hi Experts, I have recently installed HDP2.2(Depends on hadoop 2.6). My spark 1.2 is built with hadoop 2.4 profile. My program has following dependencies val avro= org.apache.avro % avro-mapred %1.7.7 val spark = org.apache.spark % spark-core_2.10 % 1.2.0 % provided My program to read avro files fails with the following error. What am I doing wrong? java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Spark 1.2 + Avro does not work in HDP2.2
Hi Experts, I have recently installed HDP2.2(Depends on hadoop 2.6). My spark 1.2 is built with hadoop 2.3 profile. /( mvn -Pyarn -Dhadoop.version=2.6.0 -Dyarn.version=2.6.0 -Phadoop-2.3 -Phive -DskipTests clean package)/ My program has following dependencies /val avro= org.apache.avro % avro-mapred %1.7.7 val spark = org.apache.spark % spark-core_2.10 % 1.2.0 % provided/ My program to read avro files fails with the following error. What am I doing wrong? Thanks Manas java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.TaskAttemptContext, but class was expected at org.apache.avro.mapreduce.AvroKeyInputFormat.createRecordReader(AvroKeyInputFormat.java:47) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:133) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) - Manas Kar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-Avro-does-not-work-in-HDP2-2-tp20667.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to convert an rdd to a single output file
Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) { writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) { IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}*
RE: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
Thanks for the info. How do I use StandardScaler() to scale example data (10246.0,[14111.0,1.0]) ? Thx tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 1:26 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? It seems that your response is not scaled which will cause issue in LBFGS. Typically, people train Linear Regression with zero-mean/unit-variable feature and response without training the intercept. Since the response is zero-mean, the intercept will be always zero. When you convert the coefficients to the oringal space from the scaled space, the intercept can be computed by w0 = y - \sum x_n w_n where x_n is the average of column n. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 10:49 AM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the confirmation. Fyi..The code below works for similar dataset, but with the feature magnitude changed, LBFGS converged to the right weights. Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the error while sequential feature 14111, 14112, 14113,14115 would converge to the right weight. Why? Below is code to implement standardscaler() for sample data (10246.0,[14111.0,1.0])): val scaler1 = new StandardScaler().fit(train.map(x = x.features)) val train1 = train.map(x = (x.label, scaler1.transform(x.features))) But I keeps on getting error: value features is not a member of (Double, org.apache.spark.mllib.linalg.Vector) Should my feature vector be .toInt instead of Double? Also, the error org.apache.spark.mllib.linalg.Vector should have an s to match import library org.apache.spark.mllib.linalg.Vectors Thanks Tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 12:16 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: resource allocation spark on yarn
Hi, FYI - There are no Worker JVMs used when Spark is launched under YARN. Instead the NodeManager in YARN does what the Worker JVM does in Spark Standalone mode. For YARN you'll want to look into the following settings: --num-executors: controls how many executors will be allocated --executor-memory: RAM for each executor --executor-cores: CPU cores for each executor Also, look into the following for Dynamic Allocation: spark.dynamicAllocation.enabled spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) spark.dynamicAllocation.schedulerBacklogTimeout (M) spark.dynamicAllocation.executorIdleTimeout (K) Link to Dynamic Allocation code (with comments on how to use this feature): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala On Fri, Dec 12, 2014 at 10:52 AM, gpatcham gpatc...@gmail.com wrote: Hi All, I have spark on yarn and there are multiple spark jobs on the cluster. Sometimes some jobs are not getting enough resources even when there are enough free resources available on cluster, even when I use below settings --num-workers 75 \ --worker-cores 16 Jobs stick with the resources what they get when job started. Do we need to look at any other configs ? can some one give pointers on this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/resource-allocation-spark-on-yarn-tp20664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression?
You can do something like the following. val rddVector = input.map({ case (response, vec) = { val newVec = MLUtils.appendBias(vec) newVec.toBreeze(newVec.size - 1) = response newVec } } val scalerWithResponse = new StandardScaler(true, true).fit(rddVector) val trainingData = scalerWithResponse.transform(rddVector).map(x= { (x(x.size - 1), Vectors.dense(x.toArray.slice(0, x.size -1)) }) Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 12:23 PM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the info. How do I use StandardScaler() to scale example data (10246.0,[14111.0,1.0]) ? Thx tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 1:26 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? It seems that your response is not scaled which will cause issue in LBFGS. Typically, people train Linear Regression with zero-mean/unit-variable feature and response without training the intercept. Since the response is zero-mean, the intercept will be always zero. When you convert the coefficients to the oringal space from the scaled space, the intercept can be computed by w0 = y - \sum x_n w_n where x_n is the average of column n. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 10:49 AM, Bui, Tri tri@verizonwireless.com wrote: Thanks for the confirmation. Fyi..The code below works for similar dataset, but with the feature magnitude changed, LBFGS converged to the right weights. Example, time sequential Feature value 1, 2, 3, 4, 5, would generate the error while sequential feature 14111, 14112, 14113,14115 would converge to the right weight. Why? Below is code to implement standardscaler() for sample data (10246.0,[14111.0,1.0])): val scaler1 = new StandardScaler().fit(train.map(x = x.features)) val train1 = train.map(x = (x.label, scaler1.transform(x.features))) But I keeps on getting error: value features is not a member of (Double, org.apache.spark.mllib.linalg.Vector) Should my feature vector be .toInt instead of Double? Also, the error org.apache.spark.mllib.linalg.Vector should have an s to match import library org.apache.spark.mllib.linalg.Vectors Thanks Tri -Original Message- From: dbt...@dbtsai.com [mailto:dbt...@dbtsai.com] Sent: Friday, December 12, 2014 12:16 PM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Do I need to applied feature scaling via StandardScaler for LBFGS for Linear Regression? You need to do the StandardScaler to help the convergency yourself. LBFGS just takes whatever objective function you provide without doing any scaling. I will like to provide LinearRegressionWithLBFGS which does the scaling internally in the nearly feature. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Dec 12, 2014 at 8:49 AM, Bui, Tri tri@verizonwireless.com.invalid wrote: Hi, Trying to use LBFGS as the optimizer, do I need to implement feature scaling via StandardScaler or does LBFGS do it by default? Following code generated error “ Failure again! Giving up and returning, Maybe the objective is just poorly behaved ?”. val data = sc.textFile(file:///data/Train/final2.train) val parsedata = data.map { line = val partsdata = line.split(',') LabeledPoint(partsdata(0).toDouble, Vectors.dense(partsdata(1).split(' ').map(_.toDouble))) } val train = parsedata.map(x = (x.label, MLUtils.appendBias(x.features))).cache() val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 50 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](2)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS(train, new LeastSquaresGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) Did I implement LBFGS for Linear Regression via “LeastSquareGradient()” correctly? Thanks Tri - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail:
Re: Spark Server - How to implement
Hey Manoj, One proposal potentially of interest is the Spark Kernel project from IBM - you should look for their. The interface in that project is more of a remote REPL interface, i.e. you submit commands (as strings) and get back results (as strings), but you don't have direct programmatic access to state like in the JobServer. Not sure if this is what you need. https://issues.apache.org/jira/browse/SPARK-4605 This type of higher level execution context is something we've generally defined to be outside of scope for the core Spark distribution because they can be cleanly built on the stable API, and from what I've seen of different applications that build on Spark, the requirements are fairly different for different applications. I'm guessing that in the next year we'll see a handful of community projects pop up around providing various types of execution services for spark apps. - Patrick On Fri, Dec 12, 2014 at 10:06 AM, Manoj Samel manojsamelt...@gmail.com wrote: Thanks Marcelo. Spark Gurus/Databricks team - do you have something in roadmap for such a spark server ? Thanks, On Thu, Dec 11, 2014 at 5:43 PM, Marcelo Vanzin van...@cloudera.com wrote: Oops, sorry, fat fingers. We've been playing with something like that inside Hive: https://github.com/apache/hive/tree/spark/spark-client That seems to have at least a few of the characteristics you're looking for; but it's a very young project, and at this moment we're not developing it as a public API, but mostly for internal Hive use. It can give you a few ideas, though. Also, SPARK-3215. On Thu, Dec 11, 2014 at 5:41 PM, Marcelo Vanzin van...@cloudera.com wrote: Hi Manoj, I'm not aware of any public projects that do something like that, except for the Ooyala server which you say doesn't cover your needs. We've been playing with something like that inside Hive, though: On Thu, Dec 11, 2014 at 5:33 PM, Manoj Samel manojsamelt...@gmail.com wrote: Hi, If spark based services are to be exposed as a continuously available server, what are the options? * The API exposed to client will be proprietary and fine grained (RPC style ..), not a Job level API * The client API need not be SQL so the Thrift JDBC server does not seem to be option .. but I could be wrong here ... * Ooyala implementation is a REST API for job submission, but as mentioned above; the desired API is a finer grain API, not a job submission Any existing implementation? Is it build your own server? Any thoughts on approach to use ? Thanks, -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: resource allocation spark on yarn
but on spark 0.9 we don't have these options --num-executors: controls how many executors will be allocated --executor-memory: RAM for each executor --executor-cores: CPU cores for each executor On Fri, Dec 12, 2014 at 12:27 PM, Sameer Farooqui same...@databricks.com wrote: Hi, FYI - There are no Worker JVMs used when Spark is launched under YARN. Instead the NodeManager in YARN does what the Worker JVM does in Spark Standalone mode. For YARN you'll want to look into the following settings: --num-executors: controls how many executors will be allocated --executor-memory: RAM for each executor --executor-cores: CPU cores for each executor Also, look into the following for Dynamic Allocation: spark.dynamicAllocation.enabled spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) spark.dynamicAllocation.schedulerBacklogTimeout (M) spark.dynamicAllocation.executorIdleTimeout (K) Link to Dynamic Allocation code (with comments on how to use this feature): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala On Fri, Dec 12, 2014 at 10:52 AM, gpatcham gpatc...@gmail.com wrote: Hi All, I have spark on yarn and there are multiple spark jobs on the cluster. Sometimes some jobs are not getting enough resources even when there are enough free resources available on cluster, even when I use below settings --num-workers 75 \ --worker-cores 16 Jobs stick with the resources what they get when job started. Do we need to look at any other configs ? can some one give pointers on this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/resource-allocation-spark-on-yarn-tp20664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
IBM open-sources Spark Kernel
We are happy to announce a developer preview of the Spark Kernel which enables remote applications to dynamically interact with Spark. You can think of the Spark Kernel as a remote Spark Shell that uses the IPython notebook interface to provide a common entrypoint for any application. The Spark Kernel obviates the need to submit jars using spark-submit, and can replace the existing Spark Shell. You can try out the Spark Kernel today by installing it from our github repo at https://github.com/ibm-et/spark-kernel. To help you get a demo environment up and running quickly, the repository also includes a Dockerfile and a Vagrantfile to build a Spark Kernel container and connect to it from an IPython notebook. We have included a number of documents with the project to help explain it and provide how-to information: * A high-level overview of the Spark Kernel and its client library ( https://issues.apache.org/jira/secure/attachment/12683624/Kernel%20Architecture.pdf ). * README (https://github.com/ibm-et/spark-kernel/blob/master/README.md) - building and testing the kernel, and deployment options including building the Docker container and packaging the kernel. * IPython instructions ( https://github.com/ibm-et/spark-kernel/blob/master/docs/IPYTHON.md) - setting up the development version of IPython and connecting a Spark Kernel. * Client library tutorial ( https://github.com/ibm-et/spark-kernel/blob/master/docs/CLIENT.md) - building and using the client library to connect to a Spark Kernel. * Magics documentation ( https://github.com/ibm-et/spark-kernel/blob/master/docs/MAGICS.md) - the magics in the kernel and how to write your own. We think the Spark Kernel will be useful for developing applications for Spark, and we are making it available with the intention of improving these capabilities within the context of the Spark community ( https://issues.apache.org/jira/browse/SPARK-4605). We will continue to develop the codebase and welcome your comments and suggestions. Signed, Chip Senkbeil IBM Emerging Technology Software Engineer
Re: Submiting multiple jobs via different threads
Haoming If the Spark UI states that one of the jobs is in the Waiting state, this is a resources issue. You will need to set properties such as: spark.executor.memory spark.cores.max Set these so that each instance only takes a portion of the available worker memory and cores. Regards, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-multiple-jobs-via-different-threads-tp7948p20669.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX for large scale PageRank (~4 billion nodes, ~128 billion edges)
Hi! tldr; We're looking at potentially using Spark+GraphX to compute PageRank over a 4 billion node + 128 billion edge graph on a regular (monthly) basis, possibly growing larger in size over time. If anyone has hints / tips / upcoming optimizations I should test use (or wants to contribute -- we'll pay the EC2 credits!) for running large scale graph processing with Spark+GraphX on EC2+S3, I'd love to hear it! =] First, I must say, I'm quite excited for the rise in the Spark ecosystem -- Spark makes life so much easier -- and it's for that very reason I'm looking to use it for some of our processing work at the tech non-profit http://commoncrawl.org/. To improve our crawl, we're aiming to run PageRank monthly on our crawl archives. As we can run the whole pipeline in Spark+GraphX, it's really quite a tempting proposition for us. For that reason, I've been looking at replicating the existing experiments from the GraphX: Unifying Data-Parallel and Graph-Parallel Analytics paper to make sure my general setup and experimental methodology are sound before attempting to scale up to the larger dataset. One issue is that the paper states the hardware, but not the convergence tolerance or number of iterations for PageRank. I've read a separate figure in a Spark presentation (http://www.graphanalysis.org/IPDPS2014-workshop/Gonzales.pdf) that reports 68 seconds for 10 iterations, but no clue if 10 iterations are comparable. I'm using the Spark EC2 spin up scripts, and other than some minor issues such as Ganglia failing[1], getting a cluster running has been positive and smooth sailing. === Hardware: All experiments were done with either 16 m2.4xlarge nodes or 32 r3.xlarge machines. That has comparable RAM and CPU to the machines used in the paper whilst also having SSD instead of magnetic disk. I've found more machines can work better for downloading large amounts of data from S3, where our target dataset will be stored. === Replicating experiments: I've been able to replicate what I believe to be the loose results for the LiveJournal and Twitter graphs, assuming the paper was run for 10 iterations. LiveJournal runs 6 seconds per iteration and Twitter runs 47 seconds per iteration for example. I've also tested the Twitter graph with serialization and compressed RDD (snappy) is turned on, which bumps it up to ~100 seconds per iteration. On the larger graphs, I start hitting trouble. Using portions of the 4 billion node graph, specifically 157 million nodes + 3.6 billion edges and 326 million nodes + 9.1 billion edges, things start getting more complicated however. The first and most persistent is Java heap space exceptions. OpenHashSet in EdgePartitionBuilder seems to be a common culprit -- possibly as there could still be millions or even billions of unique nodes in a single partition. There is also a consistent java.lang.Long cannot be cast to scala.Tuple2 class cast exceptions [2] produced by the SortShuffleWriter / ExternalSorter but it's slowly getting done - primarily as some of the partitions don't seem to need the external sort or don't trigger the error. Potentially due to using SORT instead of HASH, but I've not tested that yet. === Code: I've put the code up on GitHub under the terrible name graphx-prank. https://github.com/Smerity/graphx-prank The given code allows me to run large graphs locally on my laptop for testing, whereas the GraphX PageRank example tends to fail at the memory intensive parts. I can run LiveJournal on my underpowered laptop using my code, for example, but not using the GraphX PageRank example. The aim is for the code to be used as part of a pipeline at Common Crawl for extracting a hyperlink graph, computing PageRank over it, then storing the results to determine what we crawl next. === General questions and comments: (a) Is this size of graph sane to use with GraphX yet with 50 m2.4xlarge nodes or 100 r3.xlarge machines? I know optimizations for insanely large graphs are coming in but I'm potentially a little early? (b) Any improvements / optimizations for running Spark and GraphX at this scale? (c) Are there any other example scripts in the wild, especially of good techniques or optimized Spark/GraphX usage? I'm new to both Scala, Spark, and GraphX, so I'm always looking for good resources. P.S. The email was more easily readable before I had to remove the HTML to get it past the mailing list spam filter. Sorry. === [1]: I think it's related to Starting httpd: httpd: Syntax error on line 153 of /etc/httpd/conf/httpd.conf: Cannot load modules/mod_authn_alias.so into server: /etc/httpd/modules/mod_authn_alias.so: cannot open shared object file: No such file or directory but I've not spent time investigating as Ganglia is less of a concern for me right now. [2]: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple2 org.apache.spark.graphx.impl.RoutingTableMessageSerializer$$anon$1$$anon$2.writeObject(Serializers.scala:39)
Re: how to convert an rdd to a single output file
The objective is to let the Spark application generate a file in a format which can be consumed by other programs - as I said I am willing to give up parallelism at this stage (all the expensive steps were earlier but do want an efficient way to pass once through an RDD without the requirement to hold it in memory as a list. On Fri, Dec 12, 2014 at 12:22 PM, Sameer Farooqui same...@databricks.com wrote: Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) { writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) { IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}* -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: how to convert an rdd to a single output file
what would good spill settings be? On Fri, Dec 12, 2014 at 2:45 PM, Sameer Farooqui same...@databricks.com wrote: You could try re-partitioning or coalescing the RDD to partition and then write it to disk. Make sure you have good spill settings enabled so that the RDD can spill to the local temp dirs if it has to. On Fri, Dec 12, 2014 at 2:39 PM, Steve Lewis lordjoe2...@gmail.com wrote: The objective is to let the Spark application generate a file in a format which can be consumed by other programs - as I said I am willing to give up parallelism at this stage (all the expensive steps were earlier but do want an efficient way to pass once through an RDD without the requirement to hold it in memory as a list. On Fri, Dec 12, 2014 at 12:22 PM, Sameer Farooqui same...@databricks.com wrote: Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) {writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) {IScoredScan scan = scanIterator.next(); writer.appendScan(out, getApplication(), scan);} writer.appendFooter(out, getApplication());}* -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Spark SQL API Doc IsCached as SQL command
Hello, Few questions on Spark SQL: 1) Does Spark SQL support equivalent SQL Query for Scala command: IsCached(table name) ? 2) Is there a documentation spec I can reference for question like this? Closest doc I can find is this one: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#deploying-in-existing-hive-warehouses Thanks, Judy
Re: Spark SQL API Doc IsCached as SQL command
http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory On Fri, Dec 12, 2014 at 3:14 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Hello, Few questions on Spark SQL: 1) Does Spark SQL support equivalent SQL Query for Scala command: IsCached(table name) ? 2) Is there a documentation spec I can reference for question like this? Closest doc I can find is this one: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#deploying-in-existing-hive-warehouses Thanks, Judy
Re: IBM open-sources Spark Kernel
Hi Sam, We developed the Spark Kernel with a focus on the newest version of the IPython message protocol (5.0) for the upcoming IPython 3.0 release. We are building around Apache Spark's REPL, which is used in the current Spark Shell implementation. The Spark Kernel was designed to be extensible through magics ( https://github.com/ibm-et/spark-kernel/blob/master/docs/MAGICS.md), providing functionality that might be needed outside the Scala interpreter. Finally, a big part of our focus is on application development. Because of this, we are providing a client library for applications to connect to the Spark Kernel without needing to implement the ZeroMQ protocol. Signed, Chip Senkbeil From: Sam Bessalah samkiller@gmail.com To: Robert C Senkbeil/Austin/IBM@IBMUS Date: 12/12/2014 04:20 PM Subject:Re: IBM open-sources Spark Kernel Wow. Thanks. Can't wait to try this out. Great job. How Is it different from Iscala or Ispark? On Dec 12, 2014 11:17 PM, Robert C Senkbeil rcsen...@us.ibm.com wrote: We are happy to announce a developer preview of the Spark Kernel which enables remote applications to dynamically interact with Spark. You can think of the Spark Kernel as a remote Spark Shell that uses the IPython notebook interface to provide a common entrypoint for any application. The Spark Kernel obviates the need to submit jars using spark-submit, and can replace the existing Spark Shell. You can try out the Spark Kernel today by installing it from our github repo at https://github.com/ibm-et/spark-kernel. To help you get a demo environment up and running quickly, the repository also includes a Dockerfile and a Vagrantfile to build a Spark Kernel container and connect to it from an IPython notebook. We have included a number of documents with the project to help explain it and provide how-to information: * A high-level overview of the Spark Kernel and its client library ( https://issues.apache.org/jira/secure/attachment/12683624/Kernel%20Architecture.pdf ). * README (https://github.com/ibm-et/spark-kernel/blob/master/README.md) - building and testing the kernel, and deployment options including building the Docker container and packaging the kernel. * IPython instructions ( https://github.com/ibm-et/spark-kernel/blob/master/docs/IPYTHON.md) - setting up the development version of IPython and connecting a Spark Kernel. * Client library tutorial ( https://github.com/ibm-et/spark-kernel/blob/master/docs/CLIENT.md) - building and using the client library to connect to a Spark Kernel. * Magics documentation ( https://github.com/ibm-et/spark-kernel/blob/master/docs/MAGICS.md) - the magics in the kernel and how to write your own. We think the Spark Kernel will be useful for developing applications for Spark, and we are making it available with the intention of improving these capabilities within the context of the Spark community ( https://issues.apache.org/jira/browse/SPARK-4605). We will continue to develop the codebase and welcome your comments and suggestions. Signed, Chip Senkbeil IBM Emerging Technology Software Engineer
SVMWithSGD.run source code
I'm looking at the source code of SVM.scala and trying to find the location of the source code of the following function: def train(...): SVMModel = { new SVMWithSGD( ... ).run(input, initialWeights) } I'm wondering where I can find the code for SVMWithSGD().run()? I'd like to see the implementation of the function run(). Thanks! Caron - Thanks! -Caron -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVMWithSGD-run-source-code-tp20671.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SVMWithSGD.run source code
class SVMWithSGD is defined in the same file you're already looking at. It inherits the run() method from its superclass, GeneralizedLinearAlgorithm. An IDE would help you trace this right away. On Sat, Dec 13, 2014 at 12:52 AM, Caron caron.big...@gmail.com wrote: I'm looking at the source code of SVM.scala and trying to find the location of the source code of the following function: def train(...): SVMModel = { new SVMWithSGD( ... ).run(input, initialWeights) } I'm wondering where I can find the code for SVMWithSGD().run()? I'd like to see the implementation of the function run(). Thanks! Caron - Thanks! -Caron -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVMWithSGD-run-source-code-tp20671.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
sbt assembly with hive
What is the proper way to build with hive from sbt? The SPARK_HIVE is deprecated. However after running the following: sbt -Pyarn -Phadoop-2.3 -Phive assembly/assembly And then bin/pyspark hivectx = HiveContext(sc) hivectx.hiveql(select * from my_table) Exception: (You must build Spark with Hive. Export 'SPARK_HIVE=true' and run sbt/sbt assembly, Py4JError(u'Trying to call a package.',))
clean up of state in State Dstream
I am using *updateStateByKey *to maintain state in my streaming application, the state gets accumulated over time. Is there a way i can delete the old state data or put a limit on the amount of state the State Dstream can keep in the system. Thanks, Sunil.
Re: sbt assembly with hive
I am getting the same message when trying to get HIveContext in CDH 5.1 after enabling Spark. I am thinking Spark should come with Hive enabled (default option) as Hive metastore is a common way to share data, due to popularity of Hive and other SQL-Over-Hadoop technologies like Impala. Thanks, Abhi On Fri, Dec 12, 2014 at 6:40 PM, Stephen Boesch java...@gmail.com wrote: What is the proper way to build with hive from sbt? The SPARK_HIVE is deprecated. However after running the following: sbt -Pyarn -Phadoop-2.3 -Phive assembly/assembly And then bin/pyspark hivectx = HiveContext(sc) hivectx.hiveql(select * from my_table) Exception: (You must build Spark with Hive. Export 'SPARK_HIVE=true' and run sbt/sbt assembly, Py4JError(u'Trying to call a package.',)) -- Abhi Basu
Re: clean up of state in State Dstream
If you no longer need to maintain state for a key, just return None for that value and it gets removed. From: Sunil Yarram yvsu...@gmail.commailto:yvsu...@gmail.com Date: Friday, December 12, 2014 at 9:44 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: clean up of state in State Dstream I am using updateStateByKey to maintain state in my streaming application, the state gets accumulated over time. Is there a way i can delete the old state data or put a limit on the amount of state the State Dstream can keep in the system. Thanks, Sunil.
Re: resource allocation spark on yarn
Hi, In addition to the options Sameer Mentioned, we need to enable external shuffle manager, right? Thanks, - Tsuyoshi On Sat, Dec 13, 2014 at 5:27 AM, Sameer Farooqui same...@databricks.com wrote: Hi, FYI - There are no Worker JVMs used when Spark is launched under YARN. Instead the NodeManager in YARN does what the Worker JVM does in Spark Standalone mode. For YARN you'll want to look into the following settings: --num-executors: controls how many executors will be allocated --executor-memory: RAM for each executor --executor-cores: CPU cores for each executor Also, look into the following for Dynamic Allocation: spark.dynamicAllocation.enabled spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (N) spark.dynamicAllocation.schedulerBacklogTimeout (M) spark.dynamicAllocation.executorIdleTimeout (K) Link to Dynamic Allocation code (with comments on how to use this feature): https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala On Fri, Dec 12, 2014 at 10:52 AM, gpatcham gpatc...@gmail.com wrote: Hi All, I have spark on yarn and there are multiple spark jobs on the cluster. Sometimes some jobs are not getting enough resources even when there are enough free resources available on cluster, even when I use below settings --num-workers 75 \ --worker-cores 16 Jobs stick with the resources what they get when job started. Do we need to look at any other configs ? can some one give pointers on this issue. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/resource-allocation-spark-on-yarn-tp20664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- - Tsuyoshi - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
Yes, socketTextStream starts a TCP client that tries to connect to a TCP server (localhost: in your case). If there is a server running on that port that can send data to connected TCP connections, then you will receive data in the stream. Did you check out the quick example in the streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html That has instructions to start a netcat server on port and send data to spark streaming through that. TD On Fri, Dec 12, 2014 at 9:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote: socketTextStream is Socket client which will read from a TCP ServerSocket. Thanks Best Regards On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I dont' understand what spark streaming socketTextStream is waiting... is it like a server so you just have to send data from a client?? or what's it excepting? 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL API Doc IsCached as SQL command
There isn’t a SQL statement that directly maps |SQLContext.isCached|, but you can use |EXPLAIN EXTENDED| to check whether the underlying physical plan is a |InMemoryColumnarTableScan|. On 12/13/14 7:14 AM, Judy Nash wrote: Hello, Few questions on Spark SQL: 1)Does Spark SQL support equivalent SQL Query for Scala command: IsCached(table name) ? 2)Is there a documentation spec I can reference for question like this? Closest doc I can find is this one: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#deploying-in-existing-hive-warehouses Thanks, Judy