merge elements in a Spark RDD under custom condition
Hi Pro, I want to merge elements in a Spark RDD when the two elements satisfy certain condition Suppose there is a RDD[Seq[Int]], where some Seq[Int] in this RDD contain overlapping elements. The task is to merge all overlapping Seq[Int] in this RDD, and store the result into a new RDD. For example, suppose RDD[Seq[Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]], the result should be [[1,2,3,4,5], [7,8,9]]. Since RDD[Seq[Int]] is very large, I cannot do it in driver program. Is it possible to get it done using distributed groupBy/map/reduce, etc? Thanks in advance, Pengcheng
Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor
The workaround was to wrap the map returned by spark libraries into HashMap and then broadcast them. Could anyone please let me know if there is any issue open? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down
Hello spark users! I found lots of strange messages in driver log. Here it is: 2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25] ERROR akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter] - AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] - [akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] I got this message for every worker twice. First - for driverPropsFetcher and next for sparkExecutor. Looks like spark shutdown remote akka system incorrectly or there is some race condition in this process and driver sent some data to worker, but worker's actor system already in shutdown state. Except for this message everything works fine. But this is ERROR level message and I found it in my ERROR only log. Do you have any idea is it configuration issue, bug in spark or akka or something else? Thanks!
Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor
SerializableMapWrapper was added in https://issues.apache.org/jira/browse/SPARK-3926; do you mind opening a new JIRA and linking it to that one? On Mon, Dec 1, 2014 at 12:17 AM, lokeshkumar lok...@dataken.net wrote: The workaround was to wrap the map returned by spark libraries into HashMap and then broadcast them. Could anyone please let me know if there is any issue open? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.0.0 - RDD from snappy compress avro file
Hi Vikas and Simone, thanks for the replies. Yeah I understand this would be easier with 1.2 but this is completely out of my control. I really have to work with 1.0.0. About Simone's approach, during the imports I get: /scala import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat } console:17: error: object mapreduce is not a member of package org.apache.avro import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat, AvroKeyOutputFormat } ^ scala import org.apache.avro.mapred.AvroKey console:17: error: object mapred is not a member of package org.apache.avro import org.apache.avro.mapred.AvroKey ^ scala import com.twitter.chill.avro.AvroSerializer console:18: error: object avro is not a member of package com.twitter.chill import com.twitter.chill.avro.AvroSerializer ^/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20073.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating a SchemaRDD from an existing API
Hi Michael, About this new data source API, what type of data sources would it support? Does it have to be RDBMS necessarily? Cheers On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust mich...@databricks.com wrote: You probably don't need to create a new kind of SchemaRDD. Instead I'd suggest taking a look at the data sources API that we are adding in Spark 1.2. There is not a ton of documentation, but the test cases show how to implement the various interfaces https://github.com/apache/spark/tree/master/sql/core/src/test/scala/org/apache/spark/sql/sources, and there is an example library for reading Avro data https://github.com/databricks/spark-avro. On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera nira...@wso2.com wrote: Hi, I am evaluating Spark for an analytic component where we do batch processing of data using SQL. So, I am particularly interested in Spark SQL and in creating a SchemaRDD from an existing API [1]. This API exposes elements in a database as datasources. Using the methods allowed by this data source, we can access and edit data. So, I want to create a custom SchemaRDD using the methods and provisions of this API. I tried going through Spark documentation and the Java Docs, but unfortunately, I was unable to come to a final conclusion if this was actually possible. I would like to ask the Spark Devs, 1. As of the current Spark release, can we make a custom SchemaRDD? 2. What is the extension point to a custom SchemaRDD? or are there particular interfaces? 3. Could you please point me the specific docs regarding this matter? Your help in this regard is highly appreciated. Cheers [1] https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics -- *Niranda Perera* Software Engineer, WSO2 Inc. Mobile: +94-71-554-8430 Twitter: @n1r44 https://twitter.com/N1R44 -- *Niranda Perera* Software Engineer, WSO2 Inc. Mobile: +94-71-554-8430 Twitter: @n1r44 https://twitter.com/N1R44
RE: Unable to compile spark 1.1.0 on windows 8.1
Hi Judy, Thank you for your response. When I try to compile using maven mvn -Dhadoop.version=1.2.1 -DskipTests clean package I get an error Error: Could not find or load main class . I have maven 3.0.4. And when I run command sbt package I get the same exception as earlier. I have done the following steps: 1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip to a folder d:\myworkplace\software\spark-1.1.0 2. Then I downloaded sbt-0.13.7.zip and extract it to folder d:\myworkplace\software\sbt 3. Update the PATH environment variable to include d:\myworkplace\software\sbt\bin in the PATH. 4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0 5. Run the command sbt assembly 6. As a side effect of this command a number of libraries are downloaded and I get an initial error that path C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader does not exist. 7. I manually create this subfolder ec3aa8f39111944cc5f2\sbt-pom-reader and retry to get the next error as described in my initial error. Is this the correct procedure to compile spark 1.1.0? Please let me know. Hoping to hear from you soon. Regards, ishwardeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Kryo exception for CassandraSQLRow
I am using Cassandra-Spark connector to pull data from Cassandra, process it and write it back to Cassandra. Now I am getting the following exception, and apparently it is Kryo serialisation. Does anyone what is the reason and how this can be solved? I also tried to register org.apache.spark.sql.cassandra.CassandraSQLRow in kryo.register , but even this did not solve the problem and exception remains. WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7, ip-X-Y-Z): com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRow Serialization trace: _2 (org.apache.spark.util.MutablePair) com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218) org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904) org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) I am using Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the build: org.apache.spark % spark-mllib_2.10 % 1.1.0 exclude(com.google.guava, guava), com.google.guava % guava % 16.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.1.0 exclude(com.google.guava, guava) withSources() withJavadoc(), org.apache.cassandra % cassandra-all % 2.1.1 exclude(com.google.guava, guava) , org.apache.cassandra % cassandra-thrift % 2.1.1 exclude(com.google.guava, guava) , com.datastax.cassandra % cassandra-driver-core % 2.1.2 exclude(com.google.guava, guava) , org.apache.spark %% spark-core % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.hadoop, hadoop -core), org.apache.spark %% spark-streaming % 1.1.0 % provided exclude(com.google.guava, guava), org.apache.spark %% spark-catalyst % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.spark %% spark-sql % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.spark %% spark-hive % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.hadoop % hadoop-client % 1.0.4 % provided, best, /Shahab
Spark 1.1.0: weird spark-shell behavior
Hello, I have two weird effects when working with spark-shell: 1. This code executed in spark-shell causes an exception below. At the same time it works perfectly when submitted with spark-submit! : import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.mahout.math.VectorWritable import com.google.common.io.ByteStreams import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkContext.rddToPairRDDFunctions import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} val hConf = HBaseConfiguration.create() hConf.set(hbase.defaults.for.version.skip, true) hConf.set(hbase.defaults.for.version, 0.98.6-cdh5.2.0) hConf.set(HConstants.ZOOKEEPER_QUORUM, myserv) hConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, 2181) hConf.set(TableInputFormat.INPUT_TABLE, MyNS:MyTable) val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rdd.count() --- Exception --- 14/12/01 10:45:24 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main] java.lang.ExceptionInInitializerError at org.apache.hadoop.hbase.client.HTable.init(HTable.java:197) at org.apache.hadoop.hbase.client.HTable.init(HTable.java:159) at org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:101) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.init(NewHadoopRDD.scala:113) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104) at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: hbase-default.xml file seems to be for and old version of HBase (null), this version is 0.98.6-cdh5.2.0 at org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73) at org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105) at org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116) at org.apache.hadoop.hbase.client.HConnectionManager.clinit(HConnectionManager.java:222) ... 14 more We have already checked most of the trivial stuff with class paths and existenceof tables and column groups, enabled HBase specific settings to avoid the version checking and so on. It appears that the supplied HBase configuration is completely ignored by context. We tried to solve this issue by instantiating own spark context and encountered the second weird effect: 2. when attempting to instantiate own SparkContext we get an exception below: // imports block ... |val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) --- Exception --- 2014-12-01 10:42:24,966 WARN o.e.j.u.c.AbstractLifeCycle - FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Die Adresse wird bereits verwendet java.net.BindException: Die Adresse wird bereits verwendet at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1449) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.*; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.*; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import java.util.Map; import java.util.HashMap; public class SparkStream { public static void main(String args[]) { if(args.length != 3) { System.out.println(Usage: spark-submit -class com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip group_name topic1,topic2,...); System.exit(1); } MapString,Integer topicMap = new HashMapString,Integer(); String[] topic = args[2].split(,); for(String t: topic) { topicMap.put(t, new Integer(1)); } JavaStreamingContext jssc = new JavaStreamingContext(spark://192.168.88.130:7077, SparkStream, new Duration(3000)); JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap ); System.out.println(Connection done); JavaDStreamString data = messages.map(new FunctionTuple2String, String, String() { public String call(Tuple2String, String message) { System.out.println(NewMessage: +message._2()); //for debugging return message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Hi kafka second message another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: --- Time: 1417107363000 ms --- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks Why isn't the data block getting received? i have tried using kafka producer-consumer on console bin/kafka-console-producer and bin/kafka-console-consumer... its working perfect, but why not the code above? Please help me. Regards, Aiman Sarosh This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant
Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark:// 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: *package* *com.spark*; *import* scala.Tuple2; *import* *kafka*.serializer.Decoder; *import* *kafka*.serializer.Encoder; *import* org.apache.spark.streaming.Duration; *import* org.apache.spark.*; *import* org.apache.spark.api.java.function.*; *import* org.apache.spark.api.java.*; *import* *org.apache.spark.streaming.kafka*.KafkaUtils; *import* *org.apache.spark.streaming.kafka*.*; *import* org.apache.spark.streaming.api.java.JavaStreamingContext; *import* org.apache.spark.streaming.api.java.JavaPairDStream; *import* org.apache.spark.streaming.api.java.JavaDStream; *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; *import* java.util.Map; *import* java.util.HashMap; *public* *class* *SparkStream* { *public* *static* *void* main(String args[]) { *if*(args.length != 3) { System.*out*.println(Usage: spark-submit –class com.spark.SparkStream target/SparkStream-with-dependencies.jar zookeeper_ip group_name topic1,topic2,...); System.*exit*(1); } MapString,Integer topicMap = *new* HashMapString,Integer(); String[] topic = args[2].split(,); *for*(String t: topic) { topicMap.put(t, *new* Integer(1)); } JavaStreamingContext jssc = *new* JavaStreamingContext( spark://192.168.88.130:7077, SparkStream, *new* Duration(3000)); JavaPairReceiverInputDStreamString, String messages = *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap ); System.*out*.println(Connection done); JavaDStreamString data = messages.map(*new* *FunctionTuple2String, String, String()* { *public* String call(Tuple2String, String message) { System.*out* .println(NewMessage: +message._2()); //for debugging *return* message._2(); } }); data.print(); jssc.start(); jssc.awaitTermination(); } } I am running the job, and at other terminal I am running kafka-producer to publish messages: #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Hi kafka second message another message But the output logs at the spark-streaming console doesn't show the messages, but shows zero blocks received: --- Time: 1417107363000 ms --- 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 1417107363000 ms.0 from job set of time 1417107363000 ms 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for time 1417107363000 ms (execution: 0.000 s) 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 1417107363000 ms 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 ms 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/11/27 11:56:06 INFO
RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Hi, The spark master is working, and I have given the same url in the code: [cid:image001.png@01D00D82.6DC2FFF0] The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks What should be my approach now ? Need urgent help. Regards, Aiman From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, December 01, 2014 3:56 PM To: Sarosh, M. Cc: user@spark.apache.org Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark://192.168.88.130:7077http://192.168.88.130:7077/ ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.commailto:m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test I am publishing messages in kafka and trying to read them using spark-streaming java code and displaying them on screen. The daemons are all up: Spark-master,worker; zookeeper; kafka. I am writing a java code for doing it, using KafkaUtils.createStream code is below: package com.spark; import scala.Tuple2; import kafka.serializer.Decoder; import kafka.serializer.Encoder; import org.apache.spark.streaming.Duration; import org.apache.spark.*; import
RE: Kryo exception for CassandraSQLRow
Don't know if this'll solve it, but if you're on Spark 1.1, the Cassandra Connector version 1.1.0 final fixed the guava back compat issue. Maybe taking the guava exclusions might help? Date: Mon, 1 Dec 2014 10:48:25 +0100 Subject: Kryo exception for CassandraSQLRow From: shahab.mok...@gmail.com To: user@spark.apache.org I am using Cassandra-Spark connector to pull data from Cassandra, process it and write it back to Cassandra. Now I am getting the following exception, and apparently it is Kryo serialisation. Does anyone what is the reason and how this can be solved? I also tried to register org.apache.spark.sql.cassandra.CassandraSQLRow in kryo.register , but even this did not solve the problem and exception remains. WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7, ip-X-Y-Z): com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.sql.cassandra.CassandraSQLRowSerialization trace:_2 (org.apache.spark.util.MutablePair) com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218) org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904) org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) I am using Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the build: org.apache.spark % spark-mllib_2.10 % 1.1.0 exclude(com.google.guava, guava), com.google.guava % guava % 16.0 % provided, com.datastax.spark %% spark-cassandra-connector % 1.1.0 exclude(com.google.guava, guava) withSources() withJavadoc(), org.apache.cassandra % cassandra-all % 2.1.1 exclude(com.google.guava, guava) , org.apache.cassandra % cassandra-thrift % 2.1.1 exclude(com.google.guava, guava) , com.datastax.cassandra % cassandra-driver-core % 2.1.2 exclude(com.google.guava, guava) , org.apache.spark %% spark-core % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.hadoop, hadoop-core), org.apache.spark %% spark-streaming % 1.1.0 % provided exclude(com.google.guava, guava), org.apache.spark %% spark-catalyst % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.spark %% spark-sql % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.spark %% spark-hive % 1.1.0 % provided exclude(com.google.guava, guava) exclude(org.apache.spark, spark-core), org.apache.hadoop % hadoop-client % 1.0.4 % provided, best,/Shahab
Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
I see you have no worker machines to execute the job [image: Inline image 1] You haven't configured your spark cluster properly. Quick fix to get it running would be run it on local mode, for that change this line JavaStreamingContext jssc = *new* JavaStreamingContext(spark:// 192.168.88.130:7077, SparkStream, *new* Duration(3000)); to this JavaStreamingContext jssc = *new* JavaStreamingContext(local[4], SparkStream, *new* Duration(3000)); Thanks Best Regards On Mon, Dec 1, 2014 at 4:18 PM, m.sar...@accenture.com wrote: Hi, The spark master is working, and I have given the same url in the code: The warning is gone, and the new log is: --- Time: 141742785 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 from job set of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 25 INFO [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 24 INFO [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time 141742785 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0 received 0 blocks* --- Time: 1417427853000 ms --- INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 from job set of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms (execution: 0.001 s) INFO [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 27 INFO [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list INFO [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager (Logging.scala:logInfo(59)) - Removing RDD 26 INFO [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417427853000 ms INFO [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0 received 0 blocks* What should be my approach now ? Need urgent help. Regards, Aiman *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] *Sent:* Monday, December 01, 2014 3:56 PM *To:* Sarosh, M. *Cc:* user@spark.apache.org *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks It says: 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory A quick guess would be, you are giving the wrong master url. ( spark:// 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the master url listed there on top left corner of the page. Thanks Best Regards On Mon, Dec 1, 2014 at 3:42 PM, m.sar...@accenture.com wrote: Hi, I am integrating Kafka and Spark, using spark-streaming. I have created a topic as a kafka producer:
Re: Setting network variables in spark-shell
Don't set `spark.akka.frameSize` to 1. The max value of `spark.akka.frameSize` is 2047. The unit is MB. Best Regards, Shixiong Zhu 2014-12-01 0:51 GMT+08:00 Yanbo yanboha...@gmail.com: Try to use spark-shell --conf spark.akka.frameSize=1 在 2014年12月1日,上午12:25,Brian Dolan buddha_...@yahoo.com.INVALID 写道: Howdy Folks, What is the correct syntax in 1.0.0 to set networking variables in spark shell? Specifically, I'd like to set the spark.akka.frameSize I'm attempting this: spark-shell -Dspark.akka.frameSize=1 --executor-memory 4g Only to get this within the session: System.getProperty(spark.executor.memory) res0: String = 4g System.getProperty(spark.akka.frameSize) res1: String = null I don't believe I am violating protocol, but I have also posted this to SO: http://stackoverflow.com/questions/27215288/how-do-i-set-spark-akka-framesize-in-spark-shell ~~ May All Your Sequences Converge
Is Spark the right tool for me?
Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be decoupled from the job. So file these updates should be done locally only on the machine that started the job for the whole lifetime of this user interaction. Does Spark allow to create such long running tasks dynamically, so that when another (web) user starts a new task a new long–running task is created and run on the same node, which eventually ends and triggers the next task? Also, is it possible to identify a running task, so that a long running task can be bound to a session (db handler working on local db updates, until task done), and eventually restarted / recreated on failure? ~Ben
Re: Is Spark the right tool for me?
Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesa http://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellis http://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: - A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) - Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. - The converted files gathered and are stored in a single database (containing geometries for rendering) - Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. - … Some other data processing steps which I leave away for brevity … - There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: - I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). - Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? - Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. - The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? - The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be decoupled from the job. So file these updates should be done locally only on the machine that started the job for the whole lifetime of this user interaction. Does Spark allow to create such long running tasks dynamically, so that when another (web) user starts a new task a new long–running task is created and run on the same node, which eventually ends and triggers the next task? Also, is it possible to identify a running task, so that a long running task can be bound to a session (db handler working on local db updates, until task done), and eventually restarted / recreated on failure? ~Ben
ensuring RDD indices remain immutable
I have an RDD that serves as a feature look-up table downstream in my analysis. I create it using the zipWithIndex() and because I suppose that the elements of the RDD could end up in a different order if it is regenerated at any point, I cache it to try and ensure that the (feature -- index) mapping remains fixed. However, I'm having trouble verifying that this is actually robust -- can someone comment whether using such a mapping should be stable or is there another preferred method? zipWithUniqueID() isn't optimal since max ID generated this way is always greater than the number of features so I'm trying to avoid it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Spark the right tool for me?
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have many custom tools and steps, I’ll check our tools fit in. The end result after is actually a 3D map for native OpenGL based rendering on iOS / Android [1]. I’m using GeoPackage which is basically SQLite with R-Tree and a small library around it (more lightweight than SpatialLite). I want to avoid accessing the SQLite db from any other machine or task, that’s where I thought I can use a long running task which is the only process responsible to update a local-only stored SQLite db file. As you also said SQLite (or mostly any other file based db) won’t work well over network. This isn’t only limited to R-Tree but expected limitation because of file locking issues as documented also by SQLite. I also thought to do the same thing when rendering the (web) maps. In combination with the db handler which does the actual changes, I thought to run a map server instance on each node, configure it to add the database location as map source once the task starts. Cheers Ben [1] http://www.deep-map.com Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesahttp://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellishttp://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be
Re: Is Spark the right tool for me?
… Sorry, I forgot to mention why I’m basically bound to SQLite. The workflow involves more data processings than I mentioned. There are several tools in the chain which either rely on SQLite as exchange format, or processings like data cleaning that are done orders of magnitude faster / or using less resources than a heavy weight db for these specialized (and temporary) tasks. Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesahttp://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellishttp://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be decoupled from the job. So file these updates should be done locally only on the machine that started the job for the whole lifetime of this user interaction. Does Spark allow to create such long running tasks dynamically, so that when another (web) user starts a new task a new long–running task is created and run on the same node, which eventually ends and triggers the next task? Also, is it possible to identify a running task, so that a long running task can be bound to a session (db handler working on local db updates, until task done), and eventually restarted / recreated on failure? ~Ben
How take top N of top M from RDD as RDD
Hi, I have a problem, it is easy in Scala code, but I can not take the top N from RDD as RDD. There are 1 Student Score, ask take top 10 age, and then take top 10 from each age, the result is 100 records. The Scala code is here, but how can I do it in RDD, *for RDD.take return is Array, but other RDD.* example Scala code: import scala.util.Random case class StudentScore(age: Int, num: Int, score: Int, name: Int) val scores = for { i - 1 to 1 } yield { StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), Random.nextInt()) } def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, Seq[StudentScore])] = { val groupedScore = scores.groupBy(byKey) .map{case (_, _scores) = (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq groupedScore.sortBy(_._1).take(10) } val topScores = for { (_, ageScores) - takeTop(scores, _.age) (_, numScores) - takeTop(ageScores, _.num) } yield { numScores } topScores.size -- ~Yours, Xuefeng Wu/吴雪峰 敬上
Re: Is Spark the right tool for me?
Indeed. However, I guess the important load and stress is in the processing of the 3D data (DEM or alike) into geometries/shades/whatever. Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob for more info) to perform these operations then keep an RDD of only the resulting geometries. Those geometries won't probably that heavy, hence it might be possible to coalesce(1, true) to have to whole thing on one node (or if your driver is more beefy, do a collect/foreach) to create the index. You could also create a GeoJSON of the geometries and create the r-tree on it (not sure about this one). On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have many custom tools and steps, I’ll check our tools fit in. The end result after is actually a 3D map for native OpenGL based rendering on iOS / Android [1]. I’m using GeoPackage which is basically SQLite with R-Tree and a small library around it (more lightweight than SpatialLite). I want to avoid accessing the SQLite db from any other machine or task, that’s where I thought I can use a long running task which is the only process responsible to update a local-only stored SQLite db file. As you also said SQLite (or mostly any other file based db) won’t work well over network. This isn’t only limited to R-Tree but expected limitation because of file locking issues as documented also by SQLite. I also thought to do the same thing when rendering the (web) maps. In combination with the db handler which does the actual changes, I thought to run a map server instance on each node, configure it to add the database location as map source once the task starts. Cheers Ben [1] http://www.deep-map.com Von: andy petrella andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, user@spark.apache.org user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesa http://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellis http://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: - A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) - Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. - The converted files gathered and are stored in a single database (containing geometries for rendering) - Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. - … Some other data processing steps which I leave away for brevity … - There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: - I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). - Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? - Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself
Re: ensuring RDD indices remain immutable
I think the robust thing to do is sort the RDD, and then zipWithIndex. Even if the RDD is recomputed, the ordering and thus assignment of IDs should be the same. On Mon, Dec 1, 2014 at 2:36 PM, rok rokros...@gmail.com wrote: I have an RDD that serves as a feature look-up table downstream in my analysis. I create it using the zipWithIndex() and because I suppose that the elements of the RDD could end up in a different order if it is regenerated at any point, I cache it to try and ensure that the (feature -- index) mapping remains fixed. However, I'm having trouble verifying that this is actually robust -- can someone comment whether using such a mapping should be stable or is there another preferred method? zipWithUniqueID() isn't optimal since max ID generated this way is always greater than the number of features so I'm trying to avoid it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Time based aggregation in Real time Spark Streaming
Hi, My incoming message has time stamp as one field and i have to perform aggregation over 3 minute of time slice. Message sample Item ID Item Type timeStamp 1 X 1-12-2014:12:01 1 X 1-12-2014:12:02 1 X 1-12-2014:12:03 1 y 1-12-2014:12:04 1 y 1-12-2014:12:05 1 y 1-12-2014:12:06 Aggregation Result ItemIdItemType count aggregationStartTimeaggrEndTime 1 X 3 1-12-2014:12:01 1-12-2014:12:03 1 y 3 1-12-2014:12:04 1-12-2014:12:06 What is the best way to perform time based aggregation in spark. Kindly suggest. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Problem creating EC2 cluster using spark-ec2
I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ensuring RDD indices remain immutable
true though I was hoping to avoid having to sort... maybe there's no way around it. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094p20104.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL 1.0.0 - RDD from snappy compress avro file
btw the same error from above also happen on 1.1.0 (just tested) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How take top N of top M from RDD as RDD
For converting an Array or any List to a RDD, we can try using : sc.parallelize(groupedScore)//or whatever the name of the list variable is On Mon, Dec 1, 2014 at 8:14 PM, Xuefeng Wu ben...@gmail.com wrote: Hi, I have a problem, it is easy in Scala code, but I can not take the top N from RDD as RDD. There are 1 Student Score, ask take top 10 age, and then take top 10 from each age, the result is 100 records. The Scala code is here, but how can I do it in RDD, *for RDD.take return is Array, but other RDD.* example Scala code: import scala.util.Random case class StudentScore(age: Int, num: Int, score: Int, name: Int) val scores = for { i - 1 to 1 } yield { StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), Random.nextInt()) } def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, Seq[StudentScore])] = { val groupedScore = scores.groupBy(byKey) .map{case (_, _scores) = (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq groupedScore.sortBy(_._1).take(10) } val topScores = for { (_, ageScores) - takeTop(scores, _.age) (_, numScores) - takeTop(ageScores, _.num) } yield { numScores } topScores.size -- ~Yours, Xuefeng Wu/吴雪峰 敬上
Re: Spark Job submit
Or setting the HADOOP_CONF_DIR property. Either way, you must have the YARN configuration available to the submitting application to allow for the use of “yarn-client” or “yarn-master” The attached stack trace below doesn’t provide any information as to why the job failed. mn On Nov 27, 2014, at 12:14 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Try to add your cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to the CLASSPATH (and on SPARK_CLASSPATH) and submit the job. Thanks Best Regards On Thu, Nov 27, 2014 at 12:24 PM, Naveen Kumar Pokala npok...@spcapitaliq.com mailto:npok...@spcapitaliq.com wrote: Code is in my windows machine and cluster is in some other network in UNIX. In this case how it will identify the cluster. In case of spark cluster we can clearly specify the URL like spark://ip:port. But in case of hadoop how to specify that. What I have done is copied the hadoop configuration files from network to local and created dummy hadoop directory(in windows machine). Submitted from spark submit by adding above dummy files location with HADOOP_CONF_DIR variable. Attaching the error. image001.png Please suggest me how to proceed from the code and how to execute from spark submit from windows machine. Please provide me sample code if you have any. -Naveen From: Akhil Das [mailto:ak...@sigmoidanalytics.com mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, November 26, 2014 10:03 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org mailto:user@spark.apache.org Subject: Re: Spark Job submit How about? - Create a SparkContext - setMaster as yarn-cluster - Create a JavaSparkContext with the above SparkContext And that will submit it to the yarn cluster. Thanks Best Regards On Wed, Nov 26, 2014 at 4:20 PM, Naveen Kumar Pokala npok...@spcapitaliq.com mailto:npok...@spcapitaliq.com wrote: Hi. Is there a way to submit spark job on Hadoop-YARN cluster from java code. -Naveen
Re: Time based aggregation in Real time Spark Streaming
Hi, You can associate all the messages of a 3min interval with a unique key and then group by and finally add up. Thanks On Dec 1, 2014 9:02 PM, pankaj pankaje...@gmail.com wrote: Hi, My incoming message has time stamp as one field and i have to perform aggregation over 3 minute of time slice. Message sample Item ID Item Type timeStamp 1 X 1-12-2014:12:01 1 X 1-12-2014:12:02 1 X 1-12-2014:12:03 1 y 1-12-2014:12:04 1 y 1-12-2014:12:05 1 y 1-12-2014:12:06 Aggregation Result ItemIdItemType count aggregationStartTimeaggrEndTime 1 X 3 1-12-2014:12:01 1-12-2014:12:03 1 y 3 1-12-2014:12:04 1-12-2014:12:06 What is the best way to perform time based aggregation in spark. Kindly suggest. Thanks -- View this message in context: Time based aggregation in Real time Spark Streaming http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Mllib native netlib-java/OpenBLAS
Thanks for your reply, but I'm still running into issues installing/configuring the native libraries for MLlib. Here are the steps I've taken, please let me know if anything is incorrect. - Download Spark source - unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean package ` - Run `sbt/sbt publish-local` The last step fails with the following error (full stack trace is attached here: error.txt http://apache-spark-user-list.1001560.n3.nabble.com/file/n20110/error.txt ): [error] (sql/compile:compile) java.lang.AssertionError: assertion failed: List(object package$DebugNode, object package$DebugNode) Do I still have to install OPENBLAS/anything else if I build Spark from the source using the -Pnetlib-lgpl flag? Also, do I change the Spark version (from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Time based aggregation in Real time Spark Streaming
Hi , suppose i keep batch size of 3 minute. in 1 batch there can be incoming records with any time stamp. so it is difficult to keep track of when the 3 minute interval was start and end. i am doing output operation on worker nodes in forEachPartition not in drivers(forEachRdd) so i cannot use any shared variable to store start/end time bcoz shared variable like accumulator are write only in task. is there any solution on this. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102p20111.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
packaging from source gives protobuf compatibility issues.
scala textFile.count() java.lang.VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CompleteReques tProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; I tried ./make-distribution.sh -Dhadoop.version=2.5.0 and /usr/local/apache-maven-3.2.3/bin/mvn -Dhadoop.version=2.5.0 -DskipTests clean package both are giving same errors. I am connecting to HDFS hosted on hadoop version 2.5.0. I will appreciate any help anyone can provide! Thanks, Ami -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/packaging-from-source-gives-protobuf-compatibility-issues-tp20112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How take top N of top M from RDD as RDD
rdd.top collects it on master... If you want topk for a key run map / mappartition and use a bounded priority queue and reducebykey the queues. I experimented with topk from algebird and bounded priority queue wrapped over jpriority queue ( spark default)...bpq is faster Code example is here: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066 On Dec 1, 2014 6:46 AM, Xuefeng Wu ben...@gmail.com wrote: Hi, I have a problem, it is easy in Scala code, but I can not take the top N from RDD as RDD. There are 1 Student Score, ask take top 10 age, and then take top 10 from each age, the result is 100 records. The Scala code is here, but how can I do it in RDD, *for RDD.take return is Array, but other RDD.* example Scala code: import scala.util.Random case class StudentScore(age: Int, num: Int, score: Int, name: Int) val scores = for { i - 1 to 1 } yield { StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), Random.nextInt()) } def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, Seq[StudentScore])] = { val groupedScore = scores.groupBy(byKey) .map{case (_, _scores) = (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq groupedScore.sortBy(_._1).take(10) } val topScores = for { (_, ageScores) - takeTop(scores, _.age) (_, numScores) - takeTop(ageScores, _.num) } yield { numScores } topScores.size -- ~Yours, Xuefeng Wu/吴雪峰 敬上
RE: Unable to compile spark 1.1.0 on windows 8.1
Have you checked out the wiki here? http://spark.apache.org/docs/latest/building-with-maven.html A couple things I did differently from you: 1) I got the bits directly from github (https://github.com/apache/spark/). Use branch 1.1 for spark 1.1 2) execute maven command on cmd (powershell misses libraries sometimes) 3) Increase maven memory per suggested by building with maven wiki Hope this helps. -Original Message- From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in] Sent: Monday, December 1, 2014 1:50 AM To: u...@spark.incubator.apache.org Subject: RE: Unable to compile spark 1.1.0 on windows 8.1 Hi Judy, Thank you for your response. When I try to compile using maven mvn -Dhadoop.version=1.2.1 -DskipTests clean package I get an error Error: Could not find or load main class . I have maven 3.0.4. And when I run command sbt package I get the same exception as earlier. I have done the following steps: 1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip to a folder d:\myworkplace\software\spark-1.1.0 2. Then I downloaded sbt-0.13.7.zip and extract it to folder d:\myworkplace\software\sbt 3. Update the PATH environment variable to include d:\myworkplace\software\sbt\bin in the PATH. 4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0 5. Run the command sbt assembly 6. As a side effect of this command a number of libraries are downloaded and I get an initial error that path C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader does not exist. 7. I manually create this subfolder ec3aa8f39111944cc5f2\sbt-pom-reader and retry to get the next error as described in my initial error. Is this the correct procedure to compile spark 1.1.0? Please let me know. Hoping to hear from you soon. Regards, ishwardeep -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to Integrate openNLP with Spark
Hi, I am using openNLP NER ( Token Name finder ) for parsing an Unstructured data. In order to speed up my process( to quickly train a models and analyze the documents from the models ), I want to use Spark and I saw on the web that it is possible to connect openNLP with Spark using UIMAFit but I am not sure how to do so. Though Philip Ogren has given a very nice presentation in Spark Summit, still I am confusing. Can someone please provide me end to end example on this. I am new in Spark and UIMAFit, recently started working on it. Thanks Nikhil Jain -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Creating a SchemaRDD from an existing API
No, it should support any data source that has a schema and can produce rows. On Mon, Dec 1, 2014 at 1:34 AM, Niranda Perera nira...@wso2.com wrote: Hi Michael, About this new data source API, what type of data sources would it support? Does it have to be RDBMS necessarily? Cheers On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust mich...@databricks.com wrote: You probably don't need to create a new kind of SchemaRDD. Instead I'd suggest taking a look at the data sources API that we are adding in Spark 1.2. There is not a ton of documentation, but the test cases show how to implement the various interfaces https://github.com/apache/spark/tree/master/sql/core/src/test/scala/org/apache/spark/sql/sources, and there is an example library for reading Avro data https://github.com/databricks/spark-avro. On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera nira...@wso2.com wrote: Hi, I am evaluating Spark for an analytic component where we do batch processing of data using SQL. So, I am particularly interested in Spark SQL and in creating a SchemaRDD from an existing API [1]. This API exposes elements in a database as datasources. Using the methods allowed by this data source, we can access and edit data. So, I want to create a custom SchemaRDD using the methods and provisions of this API. I tried going through Spark documentation and the Java Docs, but unfortunately, I was unable to come to a final conclusion if this was actually possible. I would like to ask the Spark Devs, 1. As of the current Spark release, can we make a custom SchemaRDD? 2. What is the extension point to a custom SchemaRDD? or are there particular interfaces? 3. Could you please point me the specific docs regarding this matter? Your help in this regard is highly appreciated. Cheers [1] https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics -- *Niranda Perera* Software Engineer, WSO2 Inc. Mobile: +94-71-554-8430 Twitter: @n1r44 https://twitter.com/N1R44 -- *Niranda Perera* Software Engineer, WSO2 Inc. Mobile: +94-71-554-8430 Twitter: @n1r44 https://twitter.com/N1R44
Minimum cluster size for empirical testing
Hi everyone, I’m interested in empirically measuring how faster spark works in comparison to Hadoop for certain problems and input corpus I currently work with (I’ve read Matei Zahari’s “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing” paper and I want to perform a similar test). I personally think measuring the difference of speed in a single 1-node cluster isn’t enough, so I was wondering what would you recommend for this task, in regards of number of clusters/specs, etc. I was thinking it could possible to launch a couple of CDH5 VMs across a few computers or do you think it would be easier to do it with Amazon EC2? I’m particularly interested in knowing what is the overall experience in this case and what are your recommendations (what other common problems to test and what kind of benchmarks) Have a great start of the week. Cheers Pablo Valdes Software Engineer | comScore, Inc. (NASDAQ:SCOR) pval...@comscore.commailto:pval...@comscore.com Av. Del Cóndor N° 520, oficina 202, Ciudad Empresarial, Comuna de Huechuraba, | Santiago | CL ... comScore is a global leader in digital media analytics. We make audiences and advertising more valuable. To learn more, visit www.comscore.comhttp://www.comscore.com
Re: How to use FlumeInputDStream in spark cluster?
Thank you very much for your reply. I have a cluster of 8 nodes: m1, m2, m3.. m8. m1 configured as Spark master node, the rest of the nodes are all worker node. I also configured m3 as the History Server. But the history server fails to start.I ran FlumeEventCount in m1 using the right hostname and a port that is not used by any application. Here is the script I used to run FlumeEventCount: #!/bin/bash spark-submit --verbose --class org.apache.spark.examples.streaming.FlumeEventCount --deploy-mode client --master yarn-client --jars lib/spark-streaming-flume_2.10-1.1.0-cdh5.2.2-20141112.193826-1.jar /opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar m1.ptang.aerohive.com 1234 Same issue observed after added spark.ui.port=4321 in /etc/spark/conf/spark-defaults.conf. Followings are the exceptions from the job run: 14/12/01 11:51:45 INFO JobScheduler: Finished job streaming job 1417463504000 ms.0 from job set of time 1417463504000 ms 14/12/01 11:51:45 INFO JobScheduler: Total delay: 1.465 s for time 1417463504000 ms (execution: 1.415 s) 14/12/01 11:51:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) at org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.BindException: Cannot assign requested address at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) ... 3 more 14/12/01 11:51:45 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, m8.ptang.aerohive.com): org.jboss.netty.channel.ChannelException: Failed to bind to: m1.ptang.aerohive.com/192.168.10.22:1234 org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) org.apache.avro.ipc.NettyServer.init(NettyServer.java:106) org.apache.avro.ipc.NettyServer.init(NettyServer.java:119) org.apache.avro.ipc.NettyServer.init(NettyServer.java:74) org.apache.avro.ipc.NettyServer.init(NettyServer.java:68) org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164) org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171) org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
Remove added jar from spark context
Hi, Is there a way to REMOVE a jar (added via addJar) to spark contexts? We have a long running context used by the spark jobserver, but after trying to update versions of classes already in the class path via addJars, the context still runs the old versions. It would be helpful if I could remove the old jar from the context when adding the new one to prevent running stale code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Remove-added-jar-from-spark-context-tp20121.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD
Thanks Yanbo! That works! The only issue is that it won’t print the predicted value from lp.features, from code line below. model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() It prints the test input data correctly, but it keeps on printing “0.0” as the predicted values, which is the lp.features. Thanks Tri From: Yanbo Liang [mailto:yanboha...@gmail.com] Sent: Thursday, November 27, 2014 12:22 AM To: Bui, Tri Cc: user@spark.apache.org Subject: Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD Hi Tri, Maybe my latest responds for your problem is lost, whatever, the following code snippet can run correctly. val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) model.algorithm.setIntercept(true) Because that all setXXX() function in StreamingLinearRegressionWithSGD will return this.type which is an instance of itself, so we need set other configuration in a separate line w/o return value. 2014-11-27 1:04 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid: Thanks Yanbo! Modified code below: val conf = new SparkConf().setMaster(local[2]).setAppName(StreamingLinearRegression) val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse) val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)).setNumIterations(args(4).toInt).setStepSize(.0001).algorithm.setIntercept(true) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() But I am getting compile error: [error] /data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:54: value trainOn is not a member of org.apache.spark.mllib.regression.LinearRegressionWithSGD [error] model.trainOn(trainingData) [error] ^ [error] /data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:55: value predictOnValues is not a member of org.apache.spark.mllib.regression.LinearRegressionWithSGD [error] model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() [error] ^ [error] two errors found [error] (compile:compile) Compilation failed Thanks Tri From: Yanbo Liang [mailto:yanboha...@gmail.commailto:yanboha...@gmail.com] Sent: Tuesday, November 25, 2014 8:57 PM To: Bui, Tri Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD Hi Tri, setIntercept() is not a member function of StreamingLinearRegressionWithSGD, it's a member function of LinearRegressionWithSGD(GeneralizedLinearAlgorithm) which is a member variable(named algorithm) of StreamingLinearRegressionWithSGD. So you need to change your code to: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) .algorithm.setIntercept(true) Thanks Yanbo 2014-11-25 23:51 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid: Thanks Liang! It was my bad, I fat finger one of the data point, correct it and the result match with yours. I am still not able to get the intercept. I am getting [error] /data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:47: value setIntercept mber of org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD I try code below: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) model.setIntercept(addIntercept = true).trainOn(trainingData) and: val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)) .setIntercept(true) But still get compilation error. Thanks Tri From: Yanbo Liang [mailto:yanboha...@gmail.commailto:yanboha...@gmail.com] Sent: Tuesday, November 25, 2014 4:08 AM To: Bui, Tri Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD The case run correctly in my environment. 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model updated at time 141690890 ms 14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current model: weights, [0.8588] Can you provide more detail information if it is convenience? Turn on the intercept value can be set as following: val model = new StreamingLinearRegressionWithSGD() .algorithm.setIntercept(true) 2014-11-25 3:31 GMT+08:00 Bui, Tri tri@verizonwireless.com.invalidmailto:tri@verizonwireless.com.invalid: Hi, I am getting incorrect weights model from StreamingLinearRegressionwith SGD. One feature Input data is: (1,[1]) (2,[2]) … . (20,[20]) The
StreamingLinearRegressionWithSGD
Hi Gurus, I did not look at the code yet. I wonder if StreamingLinearRegressionWithSGD http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.html is equivalent to LinearRegressionWithSGD http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/LinearRegressionWithSGD.htmlwith starting weights of the current batch as the ending weights of the last batch? Since RidgeRegressionModel http://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/RidgeRegressionModel.html does not seem to have a streaming version, just wonder if this way will suffice. Thanks! J
Spark SQL table Join, one task is taking long
Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB Ram each. Default serialization, Standalone, no security Data was sqooped from relational DB to HDFS and Data is partitioned across HDFS uniformly. I am reading a fact table about 8 GB in size and one small dim table from HDFS and then doing a join on them based on a criteria. . Running the Driver on Spark shell on Spark master. ContactDetail and DAgents are read as RDD and registered as table already. Each of these tables have 60 to 90 fields and I am using Product class. val CDJoinQry= sqlContext.sql(SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902) CDJoinQry.map(ta = ta(4)).count // result is a small number This works fine and returns the result fine. Hadoop mapPartition reads and creation of RDDs are all fine But in the Count stage, I see that one of task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more) and takes about 1.1 seconds to complete out of the 1.2 seconds of total execution time. This task is usually around in the 3/4 th (say 160/200) of the total tasks. At the time of that task running, one of the CPU in one worker node goes to 100% for the duration of the task. Rest of the tasks take few ms and does only 5 MBs of Shuffle write. I have run it repeatedly and this happens regardless of which worker node this particular task is running on. I turned on Spark debug on all nodes to understand, but it was difficult to figure out where the delay is from the logs. There are no errors or re-trys in the logs. Not sure if I can post logs here for someone to look at, if so I can (about 10 Mb). Also, not sure if this normal in such a table join that one task would take most amount of time. Let me know if you have any suggestions. Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: hdfs streaming context
Try (hdfs:///localhost:8020/user/data/*) With 3 /. Thx tri -Original Message- From: Benjamin Cuthbert [mailto:cuthbert@gmail.com] Sent: Monday, December 01, 2014 4:41 PM To: user@spark.apache.org Subject: hdfs streaming context All, Is it possible to stream on HDFS directory and listen for multiple files? I have tried the following val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*) lines.filter(line = line.contains(GE)) lines.print() ssc.start() But I get 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 1417469742000 ms java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hdfs streaming context
Have you tried just passing a path to ssc.textFileStream() ? It monitors the path for new files by looking at mtime/atime ; all new/touched files in the time window appear as an rdd in the dstream. On 1 December 2014 at 14:41, Benjamin Cuthbert cuthbert@gmail.com wrote: All, Is it possible to stream on HDFS directory and listen for multiple files? I have tried the following val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*) lines.filter(line = line.contains(GE)) lines.print() ssc.start() But I get 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 1417469742000 ms java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hdfs streaming context
Yes, in fact, that's the only way it works. You need hdfs://localhost:8020/user/data, I believe. (No it's not correct to write hdfs:///...) On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert cuthbert@gmail.com wrote: All, Is it possible to stream on HDFS directory and listen for multiple files? I have tried the following val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*) lines.filter(line = line.contains(GE)) lines.print() ssc.start() But I get 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 1417469742000 ms java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hdfs streaming context
Thanks Sean, That worked just removing the /* and leaving it as /user/data Seems to be streaming in. On 1 Dec 2014, at 22:50, Sean Owen so...@cloudera.com wrote: Yes, in fact, that's the only way it works. You need hdfs://localhost:8020/user/data, I believe. (No it's not correct to write hdfs:///...) On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert cuthbert@gmail.com wrote: All, Is it possible to stream on HDFS directory and listen for multiple files? I have tried the following val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*) lines.filter(line = line.contains(GE)) lines.print() ssc.start() But I get 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 1417469742000 ms java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: hdfs streaming context
For the streaming example I am working on, Its accepted (hdfs:///user/data) without the localhost info. Let me dig through my hdfs config. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, December 01, 2014 4:50 PM To: Benjamin Cuthbert Cc: user@spark.apache.org Subject: Re: hdfs streaming context Yes, in fact, that's the only way it works. You need hdfs://localhost:8020/user/data, I believe. (No it's not correct to write hdfs:///...) On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert cuthbert@gmail.com wrote: All, Is it possible to stream on HDFS directory and listen for multiple files? I have tried the following val sparkConf = new SparkConf().setAppName(HdfsWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.textFileStream(hdfs://localhost:8020/user/data/*) lines.filter(line = line.contains(GE)) lines.print() ssc.start() But I get 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 1417469742000 ms java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456) at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107) at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputD Stream.scala:75) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: hdfs streaming context
Yes but you can't follow three slashes with host:port. No host probably defaults to whatever is found in your HDFS config. On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri tri@verizonwireless.com wrote: For the streaming example I am working on, Its accepted (hdfs:///user/data) without the localhost info. Let me dig through my hdfs config. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: hdfs streaming context
Yep. No localhost Usually, I use hdfs:///user/data to indicates I want hdfs or file:///user/data to indicates local file directory. -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, December 01, 2014 5:06 PM To: Bui, Tri Cc: Benjamin Cuthbert; user@spark.apache.org Subject: Re: hdfs streaming context Yes but you can't follow three slashes with host:port. No host probably defaults to whatever is found in your HDFS config. On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri tri@verizonwireless.com wrote: For the streaming example I am working on, Its accepted (hdfs:///user/data) without the localhost info. Let me dig through my hdfs config.
Re: How take top N of top M from RDD as RDD
hi Debasish, I found test code in map translate, would it collect all products too? + val sortedProducts = products.toArray.sorted(ord.reverse) Yours, Xuefeng Wu 吴雪峰 敬上 On 2014年12月2日, at 上午1:33, Debasish Das debasish.da...@gmail.com wrote: rdd.top collects it on master... If you want topk for a key run map / mappartition and use a bounded priority queue and reducebykey the queues. I experimented with topk from algebird and bounded priority queue wrapped over jpriority queue ( spark default)...bpq is faster Code example is here: https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066 On Dec 1, 2014 6:46 AM, Xuefeng Wu ben...@gmail.com wrote: Hi, I have a problem, it is easy in Scala code, but I can not take the top N from RDD as RDD. There are 1 Student Score, ask take top 10 age, and then take top 10 from each age, the result is 100 records. The Scala code is here, but how can I do it in RDD, for RDD.take return is Array, but other RDD. example Scala code: import scala.util.Random case class StudentScore(age: Int, num: Int, score: Int, name: Int) val scores = for { i - 1 to 1 } yield { StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), Random.nextInt()) } def takeTop(scores: Seq[StudentScore], byKey: StudentScore = Int): Seq[(Int, Seq[StudentScore])] = { val groupedScore = scores.groupBy(byKey) .map{case (_, _scores) = (_scores.foldLeft(0)((acc, v) = acc + v.score), _scores)}.toSeq groupedScore.sortBy(_._1).take(10) } val topScores = for { (_, ageScores) - takeTop(scores, _.age) (_, numScores) - takeTop(ageScores, _.num) } yield { numScores } topScores.size -- ~Yours, Xuefeng Wu/吴雪峰 敬上
Passing Java Options to Spark AM launching
Hi,How to pass the Java options (such as -XX:MaxMetaspaceSize=100M) when lunching AM or task containers? This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, setting the property such as mapreduce.map.java.opts would do the work. Any help would be highly appreciated. Regards,Mohammad
Re: Loading RDDs in a streaming fashion
file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
Re: Loading RDDs in a streaming fashion
Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
Re: Passing Java Options to Spark AM launching
Hi, have a look at the documentation for spark.driver.extraJavaOptions (which seems to have disappeared since I looked it up last week) and spark.executor.extraJavaOptions at http://spark.apache.org/docs/latest/configuration.html#runtime-environment. Tobias
Re: Passing Java Options to Spark AM launching
Thanks Tobias for the answer.Does it work for driver as well? Regards,Mohammad On Monday, December 1, 2014 5:30 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, have a look at the documentation for spark.driver.extraJavaOptions (which seems to have disappeared since I looked it up last week) and spark.executor.extraJavaOptions at http://spark.apache.org/docs/latest/configuration.html#runtime-environment. Tobias
Re: Passing Java Options to Spark AM launching
Please check whether https://github.com/apache/spark/pull/3409#issuecomment-64045677 solve the problem for launching AM. Thanks. Zhan Zhang On Dec 1, 2014, at 4:49 PM, Mohammad Islam misla...@yahoo.com.INVALID wrote: Hi, How to pass the Java options (such as -XX:MaxMetaspaceSize=100M) when lunching AM or task containers? This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, setting the property such as mapreduce.map.java.opts would do the work. Any help would be highly appreciated. Regards, Mohammad -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: Loading RDDs in a streaming fashion
Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote: Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
numpy arrays and spark sql
This works as expected in the 1.1 branch: from pyspark.sql import * rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)] # define the schema schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 value10 fields = [StructField(field_name, IntegerType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaRDD = sqlContext.applySchema(rdd, schema) # Register the table schemaRDD.registerTempTable(slice) # SQL can be run over SchemaRDDs that have been registered as a table. results = sqlContext.sql(SELECT value1 FROM slice) # The results of SQL queries are RDDs and support all the normal RDD operations. print results.collect() However changing the rdd to use a numpy array fails: import np as np rdd = sc.parallelize(np.arange(20).reshape(2, 10)) # define the schema schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 value10 fields = [StructField(field_name, np.ndarray, True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaRDD = sqlContext.applySchema(rdd, schema) The error is: Traceback (most recent call last): File stdin, line 2, in module File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in applySchema _verify_type(row, schema) File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in _verify_type % (dataType, type(obj))) TypeError: StructType(List(StructField(value1,type 'numpy.ndarray',true),StructField(value2,type 'numpy.ndarray',true),StructField(value3,type 'numpy.ndarray',true),StructField(value4,type 'numpy.ndarray',true),StructField(value5,type 'numpy.ndarray',true),StructField(value6,type 'numpy.ndarray',true),StructField(value7,type 'numpy.ndarray',true),StructField(value8,type 'numpy.ndarray',true),StructField(value9,type 'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can not accept abject in type type 'numpy.ndarray' I’ve tried np.int_ and np.int32 and they fail too. What type should I use to make a numpy arrays work? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading RDDs in a streaming fashion
Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote: Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote: Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith
Re: ALS failure with size Integer.MAX_VALUE
Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: Calling spark from a java web application.
If you are able to use YARN in your hadoop cluster, then the following technique is pretty straightforward: http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ We use this in our system and it's super easy to execute from our Tomcat application. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007p20145.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading RDDs in a streaming fashion
You may be able to construct RDDs directly from an iterator - not sure - you may have to subclass your own. On 1 December 2014 at 18:40, Keith Simmons ke...@pulse.io wrote: Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote: Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote: Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: numpy arrays and spark sql
applySchema() only accept RDD of Row/list/tuple, it does not work with numpy.array. After applySchema(), the Python RDD will be pickled and unpickled in JVM, so you will not have any benefit by using numpy.array. It will work if you convert ndarray into list: schemaRDD = sqlContext.applySchema(rdd.map(list), schema) On Mon, Dec 1, 2014 at 6:33 PM, Joseph Winston josephwins...@me.com wrote: This works as expected in the 1.1 branch: from pyspark.sql import * rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)] # define the schema schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 value10 fields = [StructField(field_name, IntegerType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaRDD = sqlContext.applySchema(rdd, schema) # Register the table schemaRDD.registerTempTable(slice) # SQL can be run over SchemaRDDs that have been registered as a table. results = sqlContext.sql(SELECT value1 FROM slice) # The results of SQL queries are RDDs and support all the normal RDD operations. print results.collect() However changing the rdd to use a numpy array fails: import np as np rdd = sc.parallelize(np.arange(20).reshape(2, 10)) # define the schema schemaString = value1 value2 value3 value4 value5 value6 value7 value8 value9 value10 fields = [StructField(field_name, np.ndarray, True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaRDD = sqlContext.applySchema(rdd, schema) The error is: Traceback (most recent call last): File stdin, line 2, in module File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 1119, in applySchema _verify_type(row, schema) File /Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py, line 735, in _verify_type % (dataType, type(obj))) TypeError: StructType(List(StructField(value1,type 'numpy.ndarray',true),StructField(value2,type 'numpy.ndarray',true),StructField(value3,type 'numpy.ndarray',true),StructField(value4,type 'numpy.ndarray',true),StructField(value5,type 'numpy.ndarray',true),StructField(value6,type 'numpy.ndarray',true),StructField(value7,type 'numpy.ndarray',true),StructField(value8,type 'numpy.ndarray',true),StructField(value9,type 'numpy.ndarray',true),StructField(value10,type 'numpy.ndarray',true))) can not accept abject in type type 'numpy.ndarray' I’ve tried np.int_ and np.int32 and they fail too. What type should I use to make a numpy arrays work? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.io.IOException: Filesystem closed
Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks
Re: java.io.IOException: Filesystem closed
What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks