merge elements in a Spark RDD under custom condition

2014-12-01 Thread Pengcheng YIN
Hi Pro,
I want to merge elements in a Spark RDD when the two elements satisfy certain 
condition

Suppose there is a RDD[Seq[Int]], where some Seq[Int] in this RDD contain 
overlapping elements. The task is to merge all overlapping Seq[Int] in this 
RDD, and store the result into a new RDD.

For example, suppose RDD[Seq[Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]], the 
result should be [[1,2,3,4,5], [7,8,9]].

Since RDD[Seq[Int]] is very large, I cannot do it in driver program. Is it 
possible to get it done using distributed groupBy/map/reduce, etc?

Thanks in advance,

Pengcheng

Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor

2014-12-01 Thread lokeshkumar
The workaround was to wrap the map returned by spark libraries into HashMap
and then broadcast them.
Could anyone please let me know if there is any issue open? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down

2014-12-01 Thread Alexey Romanchuk
Hello spark users!

I found lots of strange messages in driver log. Here it is:

2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25]
ERROR
akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter]
- AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] <-
[akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address:
akka.tcp://sparkExecutor@data1.hadoop:17372] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://sparkExecutor@data1.hadoop:17372
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]

I got this message for every worker twice. First - for driverPropsFetcher
and next for sparkExecutor. Looks like spark shutdown remote akka system
incorrectly or there is some race condition in this process and driver sent
some data to worker, but worker's actor system already in shutdown state.

Except for this message everything works fine. But this is ERROR level
message and I found it in my "ERROR only" log.

Do you have any idea is it configuration issue, bug in spark or akka or
something else?

Thanks!


Re: java.io.InvalidClassException: org.apache.spark.api.java.JavaUtils$SerializableMapWrapper; no valid constructor

2014-12-01 Thread Josh Rosen
SerializableMapWrapper was added in
https://issues.apache.org/jira/browse/SPARK-3926; do you mind opening a new
JIRA and linking it to that one?

On Mon, Dec 1, 2014 at 12:17 AM, lokeshkumar  wrote:

> The workaround was to wrap the map returned by spark libraries into HashMap
> and then broadcast them.
> Could anyone please let me know if there is any issue open?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-io-InvalidClassException-org-apache-spark-api-java-JavaUtils-SerializableMapWrapper-no-valid-cor-tp20034p20070.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark SQL 1.0.0 - RDD from snappy compress avro file

2014-12-01 Thread cjdc
Hi Vikas and Simone,

thanks for the replies.
Yeah I understand this would be easier with 1.2 but this is completely out
of my control. I really have to work with 1.0.0.

About Simone's approach, during the imports I get:
/scala> import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
:17: error: object mapreduce is not a member of package
org.apache.avro
   import org.apache.avro.mapreduce.{ AvroJob, AvroKeyInputFormat,
AvroKeyOutputFormat }
  ^

scala> import org.apache.avro.mapred.AvroKey
:17: error: object mapred is not a member of package
org.apache.avro
   import org.apache.avro.mapred.AvroKey
  ^
scala> import com.twitter.chill.avro.AvroSerializer
:18: error: object avro is not a member of package
com.twitter.chill
   import com.twitter.chill.avro.AvroSerializer
^/






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20073.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating a SchemaRDD from an existing API

2014-12-01 Thread Niranda Perera
Hi Michael,

About this new data source API, what type of data sources would it support?
Does it have to be RDBMS necessarily?

Cheers

On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust 
wrote:

> You probably don't need to create a new kind of SchemaRDD.  Instead I'd
> suggest taking a look at the data sources API that we are adding in Spark
> 1.2.  There is not a ton of documentation, but the test cases show how to
> implement the various interfaces
> ,
> and there is an example library for reading Avro data
> .
>
> On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera  wrote:
>
>> Hi,
>>
>> I am evaluating Spark for an analytic component where we do batch
>> processing of data using SQL.
>>
>> So, I am particularly interested in Spark SQL and in creating a SchemaRDD
>> from an existing API [1].
>>
>> This API exposes elements in a database as datasources. Using the methods
>> allowed by this data source, we can access and edit data.
>>
>> So, I want to create a custom SchemaRDD using the methods and provisions
>> of
>> this API. I tried going through Spark documentation and the Java Docs, but
>> unfortunately, I was unable to come to a final conclusion if this was
>> actually possible.
>>
>> I would like to ask the Spark Devs,
>> 1. As of the current Spark release, can we make a custom SchemaRDD?
>> 2. What is the extension point to a custom SchemaRDD? or are there
>> particular interfaces?
>> 3. Could you please point me the specific docs regarding this matter?
>>
>> Your help in this regard is highly appreciated.
>>
>> Cheers
>>
>> [1]
>>
>> https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics
>>
>> --
>> *Niranda Perera*
>> Software Engineer, WSO2 Inc.
>> Mobile: +94-71-554-8430
>> Twitter: @n1r44 
>>
>
>


-- 
*Niranda Perera*
Software Engineer, WSO2 Inc.
Mobile: +94-71-554-8430
Twitter: @n1r44 


RE: Unable to compile spark 1.1.0 on windows 8.1

2014-12-01 Thread Ishwardeep Singh
Hi Judy,

Thank you for your response.

When I try to compile using maven "mvn -Dhadoop.version=1.2.1 -DskipTests
clean package" I get an error "Error: Could not find or load main class" . 
I have maven 3.0.4.

And when I run command "sbt package" I get the same exception as earlier.

I have done the following steps:

1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip
to a folder "d:\myworkplace\software\spark-1.1.0"
2. Then I downloaded sbt-0.13.7.zip and extract it to folder
"d:\myworkplace\software\sbt"
3. Update the PATH environment variable to include
"d:\myworkplace\software\sbt\bin" in the PATH.
4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0
5. Run the command "sbt assembly"
6. As a side effect of this command a number of libraries are downloaded and
I get an initial error that path
C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader
does not exist. 
7. I manually create this subfolder "ec3aa8f39111944cc5f2\sbt-pom-reader"
and retry to get the next error as described in my initial error.

Is this the correct procedure to compile spark 1.1.0? Please let me know.

Hoping to hear from you soon.

Regards,
ishwardeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Kryo exception for CassandraSQLRow

2014-12-01 Thread shahab
I am using Cassandra-Spark connector to pull data from Cassandra, process
it and write it back to Cassandra.

 Now I am  getting the following exception, and apparently it is Kryo
serialisation. Does anyone what is the reason and how this can be solved?

I also tried to register "org.apache.spark.sql.cassandra.CassandraSQLRow"
in  "kryo.register" , but even this did not solve the problem and exception
remains.

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7,
ip-X-Y-Z): com.esotericsoftware.kryo.KryoException: Unable to find class:
org.apache.spark.sql.cassandra.CassandraSQLRow
Serialization trace:
_2 (org.apache.spark.util.MutablePair)

com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)

com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)

org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)

org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)



I am using  Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the
build:

   "org.apache.spark" % "spark-mllib_2.10" % "1.1.0"
exclude("com.google.guava", "guava"),

"com.google.guava" % "guava" % "16.0" % "provided",

"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0"
exclude("com.google.guava", "guava")   withSources() withJavadoc(),

"org.apache.cassandra" % "cassandra-all" % "2.1.1"
exclude("com.google.guava", "guava") ,

"org.apache.cassandra" % "cassandra-thrift" % "2.1.1"
exclude("com.google.guava", "guava") ,

"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.2"
exclude("com.google.guava", "guava") ,

"org.apache.spark" %% "spark-core" % "1.1.0" % "provided"
exclude("com.google.guava", "guava") exclude("org.apache.hadoop", "hadoop
-core"),

"org.apache.spark" %% "spark-streaming" % "1.1.0" % "provided"
exclude("com.google.guava", "guava"),

"org.apache.spark" %% "spark-catalyst"   % "1.1.0"  % "provided"
exclude("com.google.guava", "guava") exclude("org.apache.spark",
"spark-core"),

 "org.apache.spark" %% "spark-sql" % "1.1.0" %  "provided"
exclude("com.google.guava", "guava") exclude("org.apache.spark",
"spark-core"),

"org.apache.spark" %% "spark-hive" % "1.1.0" % "provided"
exclude("com.google.guava", "guava") exclude("org.apache.spark",
"spark-core"),

"org.apache.hadoop" % "hadoop-client" % "1.0.4" % "provided",

best,
/Shahab


Spark 1.1.0: weird spark-shell behavior

2014-12-01 Thread Reinis Vicups

Hello,

I have two weird effects when working with spark-shell:


1. This code executed in spark-shell causes an exception below. At the 
same time it works perfectly when submitted with spark-submit! :


import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.mahout.math.VectorWritable
import com.google.common.io.ByteStreams
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

val hConf = HBaseConfiguration.create()
hConf.set("hbase.defaults.for.version.skip", "true")
hConf.set("hbase.defaults.for.version", "0.98.6-cdh5.2.0")
hConf.set(HConstants.ZOOKEEPER_QUORUM, "myserv")
hConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hConf.set(TableInputFormat.INPUT_TABLE, "MyNS:MyTable")
val rdd = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], 
classOf[ImmutableBytesWritable], classOf[Result])

rdd.count()

--- Exception ---

14/12/01 10:45:24 ERROR ExecutorUncaughtExceptionHandler: Uncaught 
exception in thread Thread[Executor task launch worker-0,5,main]

 java.lang.ExceptionInInitializerError
at org.apache.hadoop.hbase.client.HTable.(HTable.java:197)
at org.apache.hadoop.hbase.client.HTable.(HTable.java:159)
at 
org.apache.hadoop.hbase.mapreduce.TableInputFormat.setConf(TableInputFormat.java:101)
at 
org.apache.spark.rdd.NewHadoopRDD$$anon$1.(NewHadoopRDD.scala:113)

at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: hbase-default.xml file seems to 
be for and old version of HBase (null), this version is 0.98.6-cdh5.2.0
at 
org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
at 
org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
at 
org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
at 
org.apache.hadoop.hbase.client.HConnectionManager.(HConnectionManager.java:222)

... 14 more

We have already checked most of the trivial stuff with class paths and 
existenceof tables and column groups, enabled HBase specific settings to 
avoid the version checking and so on. It appears that the supplied HBase 
configuration is completely ignored by context. We tried to solve this 
issue by instantiating own spark context and encountered the second 
weird effect:


2. when attempting to instantiate own SparkContext we get an exception 
below:


// imports block
...

|val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)

--- Exception ---

2014-12-01 10:42:24,966 WARN  o.e.j.u.c.AbstractLifeCycle - FAILED 
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Die Adresse 
wird bereits verwendet

java.net.BindException: Die Adresse wird bereits verwendet
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at 
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)

at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:199)
at 
org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at 
org.apache.spark.ui.JettyUtils$$anonfun$4.apply(JettyUtils.scala:209)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1449)

at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)

Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import java.util.Map;
import java.util.HashMap;

public class SparkStream {
   public static void main(String args[])
   {
  if(args.length != 3)
  {
 System.out.println("Usage: spark-submit -class 
com.spark.SparkStream target/SparkStream-with-dependencies.jar  
 ");
 System.exit(1);
  }


  Map topicMap = new HashMap();
  String[] topic = args[2].split(",");
  for(String t: topic)
  {
 topicMap.put(t, new Integer(1));
  }

  JavaStreamingContext jssc = new 
JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new 
Duration(3000));
  JavaPairReceiverInputDStream messages = 
KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

  System.out.println("Connection done");
  JavaDStream data = messages.map(new 
Function, String>()
{
   public String 
call(Tuple2 message)
   {
  
System.out.println("NewMessage: "+message._2()); //for debugging
  return 
message._2();
   }
});

data.print();

  jssc.start();
  jssc.awaitTermination();

   }
}


I am running the job, and at other terminal I am running kafka-producer to 
publish messages:
#bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>Hi kafka
>second message
>another message

But the output logs at the spark-streaming console doesn't show the messages, 
but shows zero blocks received:


---
Time: 1417107363000 ms
---

14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming job 
1417107363000 ms.0 from job set of time 1417107363000 ms
14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s for 
time 1417107363000 ms (execution: 0.000 s)
14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time 
1417107363000 ms
14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence list
14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD 
BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time 1417107363000 
ms
14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory
14/11/27 11:56:06 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks


Why isn't the data block getting received? i have tried using kafka 
producer-consumer on console bin/kafka-console-producer  and 
bin/kafka-console-consumer...  its working perfect, but why not the code above? 
Please help me.


Regards,
Aiman Sarosh




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://
192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM,  wrote:

>  Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
> bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>*public* *static* *void* main(String args[])
>
>{
>
>   *if*(args.length != 3)
>
>   {
>
>  System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
>   ");
>
>  System.*exit*(1);
>
>   }
>
>
>
>
>
>   Map topicMap = *new*
> HashMap();
>
>   String[] topic = args[2].split(",");
>
>   *for*(String t: topic)
>
>   {
>
>  topicMap.put(t, *new* Integer(1));
>
>   }
>
>
>
>   JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>   JavaPairReceiverInputDStream messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>   System.*out*.println("Connection done");
>
>   JavaDStream data = messages.map(*new* 
> *Function String>, String>()*
>
> {
>
>*public* String
> call(Tuple2 message)
>
>{
>
>   System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>   *return*
> message._2();
>
>}
>
> });
>
>
>
> data.print();
>
>
>
>   jssc.start();
>
>   jssc.awaitTermination();
>
>
>
>}
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >Hi kafka
>
> >second message
>
> >another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
> ---
>
> Time: 1417107363000 ms
>
> ---
>
>
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
> 14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
> 14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
> 14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
> 14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
> 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are regi

RE: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread m.sarosh
Hi,

The spark master is working, and I have given the same url in the code:
[cid:image001.png@01D00D82.6DC2FFF0]

The warning is gone, and the new log is:
---
Time: 141742785 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0 
from job set of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 25
INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 24
INFO  [sparkDriver-akka.actor.default-dispatcher-5] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[24] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 141742785 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
---
Time: 1417427853000 ms
---

INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0 
from job set of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000 ms 
(execution: 0.001 s)
INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler 
(Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD 
(Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 27
INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD 
(Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager 
(Logging.scala:logInfo(59)) - Removing RDD 26
INFO  [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream 
(Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[26] at BlockRDD 
at ReceiverInputDStream.scala:69 of time 1417427853000 ms
INFO  [sparkDriver-akka.actor.default-dispatcher-6] scheduler.ReceiverTracker 
(Logging.scala:logInfo(59)) - Stream 0 received 0 blocks

What should be my approach now ?
Need urgent help.

Regards,
Aiman

From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Monday, December 01, 2014 3:56 PM
To: Sarosh, M.
Cc: user@spark.apache.org
Subject: Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( 
spark://192.168.88.130:7077 ) Open the webUI 
running on port 8080 and use the master url listed there on top left corner of 
the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, 
mailto:m.sar...@accenture.com>> wrote:
Hi,

I am integrating Kafka and Spark, using spark-streaming. I have created a topic 
as a kafka producer:

bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test


I am publishing messages in kafka and trying to read them using spark-streaming 
java code and displaying them on screen.
The daemons are all up: Spark-master,worker; zookeeper; kafka.
I am writing a java code for doing it, using KafkaUtils.createStream
code is below:

package com.spark;

import scala.Tuple2;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import org.apache.spark.streaming.Duration;
import org.apache.spark.*;
import org.apache.spark.api.java.funct

RE: Kryo exception for CassandraSQLRow

2014-12-01 Thread Ashic Mahtab
Don't know if this'll solve it, but if you're on Spark 1.1, the Cassandra 
Connector version 1.1.0 final fixed the guava back compat issue. Maybe taking 
the guava exclusions might help?

Date: Mon, 1 Dec 2014 10:48:25 +0100
Subject: Kryo exception for CassandraSQLRow
From: shahab.mok...@gmail.com
To: user@spark.apache.org

I am using Cassandra-Spark connector to pull data from Cassandra, process it 
and write it back to Cassandra.
 Now I am  getting the following exception, and apparently it is Kryo 
serialisation. Does anyone what is the reason and how this can be solved?
I also tried to register "org.apache.spark.sql.cassandra.CassandraSQLRow" in  
"kryo.register" , but even this did not solve the problem and exception remains.
WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 7, ip-X-Y-Z): 
com.esotericsoftware.kryo.KryoException: Unable to find class: 
org.apache.spark.sql.cassandra.CassandraSQLRowSerialization trace:_2 
(org.apache.spark.util.MutablePair)
com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)

com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)

com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)   
 
org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1171)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)   
 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)  
  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1218)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:904)
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1143)  
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
   java.lang.Thread.run(Thread.java:745)


I am using  Spark 1.1.0 with cassandra-spark connector 1.1.0 , here is the 
build:







   "org.apache.spark" % "spark-mllib_2.10" % "1.1.0" 
exclude("com.google.guava", "guava"),

"com.google.guava" % "guava" % "16.0" % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0" 
exclude("com.google.guava", "guava")   withSources() withJavadoc(),

"org.apache.cassandra" % "cassandra-all" % "2.1.1"  
exclude("com.google.guava", "guava") ,

"org.apache.cassandra" % "cassandra-thrift" % "2.1.1"  
exclude("com.google.guava", "guava") ,

"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.2"  
exclude("com.google.guava", "guava") ,

"org.apache.spark" %% "spark-core" % "1.1.0" % "provided" 
exclude("com.google.guava", "guava") exclude("org.apache.hadoop", 
"hadoop-core"),

"org.apache.spark" %% "spark-streaming" % "1.1.0" % "provided"  
exclude("com.google.guava", "guava"),

"org.apache.spark" %% "spark-catalyst"   % "1.1.0"  % "provided" 
exclude("com.google.guava", "guava") exclude("org.apache.spark", "spark-core"),

 "org.apache.spark" %% "spark-sql" % "1.1.0" %  "provided" 
exclude("com.google.guava", "guava") exclude("org.apache.spark", "spark-core"),

"org.apache.spark" %% "spark-hive" % "1.1.0" % "provided" 
exclude("com.google.guava", "guava") exclude("org.apache.spark", "spark-core"), 
   

"org.apache.hadoop" % "hadoop-client" % "1.0.4" % "provided",

best,/Shahab
  

Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks

2014-12-01 Thread Akhil Das
I see you have no worker machines to execute the job

[image: Inline image 1]

You haven't configured your spark cluster properly.

Quick fix to get it running would be run it on local mode, for that change
this line

JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
192.168.88.130:7077", "SparkStream", *new* Duration(3000));

to this

JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
"SparkStream", *new* Duration(3000));


Thanks
Best Regards

On Mon, Dec 1, 2014 at 4:18 PM,  wrote:

>  Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> ---
>
> Time: 141742785 ms
>
> ---
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 141742785 ms.0
> from job set of time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 141742785 ms.0
> from job set of time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 141742785
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 141742785 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> ---
>
> Time: 1417427853000 ms
>
> ---
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM,  wrote:
>
> Hi,
>
>
>
> I am integr

Re: Setting network variables in spark-shell

2014-12-01 Thread Shixiong Zhu
Don't set `spark.akka.frameSize` to 1. The max value of
`spark.akka.frameSize` is 2047. The unit is MB.

Best Regards,
Shixiong Zhu

2014-12-01 0:51 GMT+08:00 Yanbo :

>
> Try to use spark-shell --conf spark.akka.frameSize=1
>
> 在 2014年12月1日,上午12:25,Brian Dolan  写道:
>
> Howdy Folks,
>
> What is the correct syntax in 1.0.0 to set networking variables in spark
> shell?  Specifically, I'd like to set the spark.akka.frameSize
>
> I'm attempting this:
>
> spark-shell -Dspark.akka.frameSize=1 --executor-memory 4g
>
>
> Only to get this within the session:
>
> System.getProperty("spark.executor.memory")
> res0: String = 4g
> System.getProperty("spark.akka.frameSize")
> res1: String = null
>
>
> I don't believe I am violating protocol, but I have also posted this to
> SO:
> http://stackoverflow.com/questions/27215288/how-do-i-set-spark-akka-framesize-in-spark-shell
>
> ~~
> May All Your Sequences Converge
>
>
>
>


Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?

~Ben


Re: Is Spark the right tool for me?

2014-12-01 Thread andy petrella
Not quite sure which geo processing you're doing are they raster, vector? More
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered
GeoMesa ?
Since you need a WMS (or alike), did you considered GeoTrellis
 (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is
not a geo one, and it's simple SQLite. In case you need an r-tree (or
related) index, you're headaches will come from congestion within your
database transaction... unless you go to a dedicated database like Vertica
(just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin <
benjamin.sta...@heidelberg-mobil.com> wrote:

> Hi all,
>
> I need some advise whether Spark is the right tool for my zoo. My
> requirements share commonalities with „big data“, workflow coordination and
> „reactive“ event driven data processing (as in for example Haskell Arrows),
> which doesn’t make it any easier to decide on a tool set.
>
> NB: I have asked a similar question on the Storm mailing list, but have
> been deferred to Spark. I previously thought Storm was closer to my needs –
> but maybe neither is.
>
> To explain my needs it’s probably best to give an example scenario:
>
>- A user uploads small files (typically 1-200 files, file size
>typically 2-10MB per file)
>- Files should be converted in parallel and on available nodes. The
>conversion is actually done via native tools, so there is not so much big
>data processing required, but dynamic parallelization (so for example to
>split the conversion step into as many conversion tasks as files are
>available). The conversion typically takes between several minutes and a
>few hours.
>- The converted files gathered and are stored in a single database
>(containing geometries for rendering)
>- Once the db is ready, a web map server is (re-)configured and the
>user can make small updates to the data set via a web UI.
>- … Some other data processing steps which I leave away for brevity …
>- There will be initially only a few concurrent users, but the system
>shall be able to scale if needed
>
> My current thoughts:
>
>- I should avoid to upload files into the distributed storage during
>conversion, but probably should rather have each conversion filter download
>the file it is actually converting from a shared place. Other wise it’s bad
>for scalability reasons (too many redundant copies of same temporary files
>if there are many concurrent users and many cluster nodes).
>- Apache Oozie seems an option to chain together my pipes into a
>workflow. But is it a good fit with Spark? What options do I have with
>Spark to chain a workflow from pipes?
>- Apache Crunch seems to make it easy to dynamically parallelize tasks
>(Oozie itself can’t do this). But I may not need crunch after all if I have
>Spark, and it also doesn’t seem to fit to my last problem following.
>- The part that causes me the most headache is the user interactive db
>update: I consider to use Kafka as message bus to broker between the web UI
>and a custom db handler (nb, the db is a SQLite file). But how about
>update responsiveness, isn’t it that Spark will cause some lags (as opposed
>to Storm)?
>- The db handler probably has to be implemented as a long running
>continuing task, so when a user sends some changes the handler writes these
>to the db file. However, I want this to be decoupled from the job. So file
>these updates should be done locally only on the machine that started the
>job for the whole lifetime of this user interaction. Does Spark allow to
>create such long running tasks dynamically, so that when another (web) user
>starts a new task a new long–running task is created and run on the same
>node, which eventually ends and triggers the next task? Also, is it
>possible to identify a running task, so that a long running task can be
>bound to a session (db handler working on local db updates, until task
>done), and eventually restarted / recreated on failure?
>
>
> ~Ben
>


ensuring RDD indices remain immutable

2014-12-01 Thread rok
I have an RDD that serves as a feature look-up table downstream in my
analysis. I create it using the zipWithIndex() and because I suppose that
the elements of the RDD could end up in a different order if it is
regenerated at any point, I cache it to try and ensure that the (feature -->
index) mapping remains fixed. 

However, I'm having trouble verifying that this is actually robust -- can
someone comment whether using such a mapping should be stable or is there
another preferred method? zipWithUniqueID() isn't optimal since max ID
generated this way is always greater than the number of features so I'm
trying to avoid it. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have 
many custom tools and steps, I’ll check our tools fit in. The end result after 
is actually a 3D map for native OpenGL based rendering on iOS / Android [1].

I’m using GeoPackage which is basically SQLite with R-Tree and a small library 
around it (more lightweight than SpatialLite). I want to avoid accessing the 
SQLite db from any other machine or task, that’s where I thought I can use a 
long running task which is the only process responsible to update a local-only 
stored SQLite db file. As you also said SQLite  (or mostly any other file based 
db) won’t work well over network. This isn’t only limited to R-Tree but 
expected limitation because of file locking issues as documented also by SQLite.

I also thought to do the same thing when rendering the (web) maps. In 
combination with the db handler which does the actual changes, I thought to run 
a map server instance on each node, configure it to add the database location 
as map source once the task starts.

Cheers
Ben

[1] http://www.deep-map.com

Von: andy petrella mailto:andy.petre...@gmail.com>>
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin 
mailto:benjamin.sta...@heidelberg-mobil.com>>,
 "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
GeoMesa?
Since you need a WMS (or alike), did you considered 
GeoTrellis (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
mailto:benjamin.sta...@heidelberg-mobil.com>>
 wrote:
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that starte

Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
… Sorry, I forgot to mention why I’m basically bound to SQLite. The workflow 
involves more data processings than I mentioned. There are several tools in the 
chain which either rely on SQLite as exchange format, or processings like data 
cleaning that are done orders of magnitude faster / or using less resources 
than a heavy weight db for these specialized (and temporary) tasks.

Von: andy petrella mailto:andy.petre...@gmail.com>>
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin 
mailto:benjamin.sta...@heidelberg-mobil.com>>,
 "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
GeoMesa?
Since you need a WMS (or alike), did you considered 
GeoTrellis (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
mailto:benjamin.sta...@heidelberg-mobil.com>>
 wrote:
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?

~Ben


How take top N of top M from RDD as RDD

2014-12-01 Thread Xuefeng Wu
Hi, I have a problem, it is easy in Scala code, but I can not take the top
N from RDD as RDD.


There are 1 Student Score, ask take top 10 age, and then take top 10
from each age, the result is 100 records.

The Scala code is here, but how can I do it in RDD,  *for RDD.take return
is Array, but other RDD.*

example Scala code:

import scala.util.Random

case class StudentScore(age: Int, num: Int, score: Int, name: Int)

val scores = for {
  i <- 1 to 1
} yield {
  StudentScore(Random.nextInt(100), Random.nextInt(100),
Random.nextInt(), Random.nextInt())
}


def takeTop(scores: Seq[StudentScore], byKey: StudentScore => Int):
Seq[(Int, Seq[StudentScore])] = {
  val groupedScore = scores.groupBy(byKey)
   .map{case (_, _scores) =>
(_scores.foldLeft(0)((acc, v) => acc + v.score), _scores)}.toSeq
  groupedScore.sortBy(_._1).take(10)
}

val topScores = for {
  (_, ageScores) <- takeTop(scores, _.age)
  (_, numScores) <- takeTop(ageScores, _.num)
} yield {
  numScores
}

topScores.size


-- 

~Yours, Xuefeng Wu/吴雪峰  敬上


Re: Is Spark the right tool for me?

2014-12-01 Thread andy petrella
Indeed. However, I guess the important load and stress is in the processing
of the 3D data (DEM or alike) into geometries/shades/whatever.
Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob
for more info) to perform these operations then keep an RDD of only the
resulting geometries.
Those geometries won't probably that heavy, hence it might be possible to
coalesce(1, true) to have to whole thing on one node (or if your driver is
more beefy, do a collect/foreach) to create the index.
You could also create a GeoJSON of the geometries and create the r-tree on
it (not sure about this one).



On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin <
benjamin.sta...@heidelberg-mobil.com> wrote:

> Thank you for mentioning GeoTrellis. I haven’t heard of this before. We
> have many custom tools and steps, I’ll check our tools fit in. The end
> result after is actually a 3D map for native OpenGL based rendering on iOS
> / Android [1].
>
> I’m using GeoPackage which is basically SQLite with R-Tree and a small
> library around it (more lightweight than SpatialLite). I want to avoid
> accessing the SQLite db from any other machine or task, that’s where I
> thought I can use a long running task which is the only process responsible
> to update a local-only stored SQLite db file. As you also said SQLite  (or
> mostly any other file based db) won’t work well over network. This isn’t
> only limited to R-Tree but expected limitation because of file locking
> issues as documented also by SQLite.
>
> I also thought to do the same thing when rendering the (web) maps. In
> combination with the db handler which does the actual changes, I thought to
> run a map server instance on each node, configure it to add the database
> location as map source once the task starts.
>
> Cheers
> Ben
>
> [1] http://www.deep-map.com
>
> Von: andy petrella 
> Datum: Montag, 1. Dezember 2014 15:07
> An: Benjamin Stadin , "
> user@spark.apache.org" 
> Betreff: Re: Is Spark the right tool for me?
>
> Not quite sure which geo processing you're doing are they raster, vector? More
> info will be appreciated for me to help you further.
>
> Meanwhile I can try to give some hints, for instance, did you considered
> GeoMesa ?
> Since you need a WMS (or alike), did you considered GeoTrellis
>  (go to the batch processing)?
>
> When you say SQLite, you mean that you're using Spatialite? Or your db is
> not a geo one, and it's simple SQLite. In case you need an r-tree (or
> related) index, you're headaches will come from congestion within your
> database transaction... unless you go to a dedicated database like Vertica
> (just mentioning)
>
> kr,
> andy
>
>
>
> On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin <
> benjamin.sta...@heidelberg-mobil.com> wrote:
>
>> Hi all,
>>
>> I need some advise whether Spark is the right tool for my zoo. My
>> requirements share commonalities with „big data“, workflow coordination and
>> „reactive“ event driven data processing (as in for example Haskell Arrows),
>> which doesn’t make it any easier to decide on a tool set.
>>
>> NB: I have asked a similar question on the Storm mailing list, but have
>> been deferred to Spark. I previously thought Storm was closer to my needs –
>> but maybe neither is.
>>
>> To explain my needs it’s probably best to give an example scenario:
>>
>>- A user uploads small files (typically 1-200 files, file size
>>typically 2-10MB per file)
>>- Files should be converted in parallel and on available nodes. The
>>conversion is actually done via native tools, so there is not so much big
>>data processing required, but dynamic parallelization (so for example to
>>split the conversion step into as many conversion tasks as files are
>>available). The conversion typically takes between several minutes and a
>>few hours.
>>- The converted files gathered and are stored in a single database
>>(containing geometries for rendering)
>>- Once the db is ready, a web map server is (re-)configured and the
>>user can make small updates to the data set via a web UI.
>>- … Some other data processing steps which I leave away for brevity …
>>- There will be initially only a few concurrent users, but the system
>>shall be able to scale if needed
>>
>> My current thoughts:
>>
>>- I should avoid to upload files into the distributed storage during
>>conversion, but probably should rather have each conversion filter 
>> download
>>the file it is actually converting from a shared place. Other wise it’s 
>> bad
>>for scalability reasons (too many redundant copies of same temporary files
>>if there are many concurrent users and many cluster nodes).
>>- Apache Oozie seems an option to chain together my pipes into a
>>workflow. But is it a good fit with Spark? What options do I have with
>>Spark to chain a workflow from pipes?
>>- Apache Crunch seems to

Re: Is Spark the right tool for me?

2014-12-01 Thread andy petrella
not applicable to your problem, but interesting enough to share on this
thread:
http://basepub.dauphine.fr/bitstream/handle/123456789/5260/SD-Rtree.PDF?sequence=2

On Mon Dec 01 2014 at 3:48:14 PM andy petrella 
wrote:

> Indeed. However, I guess the important load and stress is in the
> processing of the 3D data (DEM or alike) into geometries/shades/whatever.
> Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob
> for more info) to perform these operations then keep an RDD of only the
> resulting geometries.
> Those geometries won't probably that heavy, hence it might be possible to
> coalesce(1, true) to have to whole thing on one node (or if your driver is
> more beefy, do a collect/foreach) to create the index.
> You could also create a GeoJSON of the geometries and create the r-tree on
> it (not sure about this one).
>
>
>
> On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin <
> benjamin.sta...@heidelberg-mobil.com> wrote:
>
>> Thank you for mentioning GeoTrellis. I haven’t heard of this before. We
>> have many custom tools and steps, I’ll check our tools fit in. The end
>> result after is actually a 3D map for native OpenGL based rendering on iOS
>> / Android [1].
>>
>> I’m using GeoPackage which is basically SQLite with R-Tree and a small
>> library around it (more lightweight than SpatialLite). I want to avoid
>> accessing the SQLite db from any other machine or task, that’s where I
>> thought I can use a long running task which is the only process responsible
>> to update a local-only stored SQLite db file. As you also said SQLite  (or
>> mostly any other file based db) won’t work well over network. This isn’t
>> only limited to R-Tree but expected limitation because of file locking
>> issues as documented also by SQLite.
>>
>> I also thought to do the same thing when rendering the (web) maps. In
>> combination with the db handler which does the actual changes, I thought to
>> run a map server instance on each node, configure it to add the database
>> location as map source once the task starts.
>>
>> Cheers
>> Ben
>>
>> [1] http://www.deep-map.com
>>
>> Von: andy petrella 
>> Datum: Montag, 1. Dezember 2014 15:07
>> An: Benjamin Stadin , "
>> user@spark.apache.org" 
>> Betreff: Re: Is Spark the right tool for me?
>>
>> Not quite sure which geo processing you're doing are they raster, vector? 
>> More
>> info will be appreciated for me to help you further.
>>
>> Meanwhile I can try to give some hints, for instance, did you considered
>> GeoMesa ?
>> Since you need a WMS (or alike), did you considered GeoTrellis
>>  (go to the batch processing)?
>>
>> When you say SQLite, you mean that you're using Spatialite? Or your db is
>> not a geo one, and it's simple SQLite. In case you need an r-tree (or
>> related) index, you're headaches will come from congestion within your
>> database transaction... unless you go to a dedicated database like Vertica
>> (just mentioning)
>>
>> kr,
>> andy
>>
>>
>>
>> On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin <
>> benjamin.sta...@heidelberg-mobil.com> wrote:
>>
>>> Hi all,
>>>
>>> I need some advise whether Spark is the right tool for my zoo. My
>>> requirements share commonalities with „big data“, workflow coordination and
>>> „reactive“ event driven data processing (as in for example Haskell Arrows),
>>> which doesn’t make it any easier to decide on a tool set.
>>>
>>> NB: I have asked a similar question on the Storm mailing list, but have
>>> been deferred to Spark. I previously thought Storm was closer to my needs –
>>> but maybe neither is.
>>>
>>> To explain my needs it’s probably best to give an example scenario:
>>>
>>>- A user uploads small files (typically 1-200 files, file size
>>>typically 2-10MB per file)
>>>- Files should be converted in parallel and on available nodes. The
>>>conversion is actually done via native tools, so there is not so much big
>>>data processing required, but dynamic parallelization (so for example to
>>>split the conversion step into as many conversion tasks as files are
>>>available). The conversion typically takes between several minutes and a
>>>few hours.
>>>- The converted files gathered and are stored in a single database
>>>(containing geometries for rendering)
>>>- Once the db is ready, a web map server is (re-)configured and the
>>>user can make small updates to the data set via a web UI.
>>>- … Some other data processing steps which I leave away for brevity …
>>>- There will be initially only a few concurrent users, but the
>>>system shall be able to scale if needed
>>>
>>> My current thoughts:
>>>
>>>- I should avoid to upload files into the distributed storage during
>>>conversion, but probably should rather have each conversion filter 
>>> download
>>>the file it is actually converting from a shared place. Other wise it’s 
>>> bad
>>>for scalability reason

Re: ensuring RDD indices remain immutable

2014-12-01 Thread Sean Owen
I think the robust thing to do is sort the RDD, and then zipWithIndex.
Even if the RDD is recomputed, the ordering and thus assignment of IDs
should be the same.

On Mon, Dec 1, 2014 at 2:36 PM, rok  wrote:
> I have an RDD that serves as a feature look-up table downstream in my
> analysis. I create it using the zipWithIndex() and because I suppose that
> the elements of the RDD could end up in a different order if it is
> regenerated at any point, I cache it to try and ensure that the (feature -->
> index) mapping remains fixed.
>
> However, I'm having trouble verifying that this is actually robust -- can
> someone comment whether using such a mapping should be stable or is there
> another preferred method? zipWithUniqueID() isn't optimal since max ID
> generated this way is always greater than the number of features so I'm
> trying to avoid it.
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi,

My incoming message has time stamp as one field and i have to perform
aggregation over 3 minute of time slice.

Message sample

"Item ID" "Item Type" "timeStamp"
1  X   1-12-2014:12:01
1  X   1-12-2014:12:02
1  X   1-12-2014:12:03
1  y   1-12-2014:12:04
1  y   1-12-2014:12:05
1  y   1-12-2014:12:06

Aggregation Result
ItemIdItemType  count   aggregationStartTimeaggrEndTime
1  X 3  1-12-2014:12:01
1-12-2014:12:03
1  y  3   1-12-2014:12:04
 1-12-2014:12:06

What is the best way to perform time based aggregation in spark.
Kindly suggest.

Thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Problem creating EC2 cluster using spark-ec2

2014-12-01 Thread Dave Challis
I've been trying to create a Spark cluster on EC2 using the
documentation at https://spark.apache.org/docs/latest/ec2-scripts.html
(with Spark 1.1.1).

Running the script successfully creates some EC2 instances, HDFS etc.,
but appears to fail to copy the actual files needed to run Spark
across.

I ran the following commands:

$ cd ~/src/spark-1.1.1/ec2
$ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1
--region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium
--no-ganglia launch foocluster

I see the following in the script's output:

(instance and HDFS set up happens here)
...
Persistent HDFS installed, won't start by default...
~/spark-ec2 ~/spark-ec2
Setting up spark-standalone
RSYNC'ing /root/spark/conf to slaves...
*.eu-west-1.compute.amazonaws.com
RSYNC'ing /root/spark-ec2 to slaves...
*.eu-west-1.compute.amazonaws.com
./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No
such file or directory
./spark-standalone/setup.sh: line 27:
/root/spark/sbin/start-master.sh: No such file or directory
./spark-standalone/setup.sh: line 33:
/root/spark/sbin/start-slaves.sh: No such file or directory
Setting up tachyon
RSYNC'ing /root/tachyon to slaves...
...
(Tachyon setup happens here without any problem)

I can ssh to the master (using the ./spark-ec2 login), and looking in
/root/, it contains:

$ ls /root
ephemeral-hdfs  hadoop-native  mapreduce  persistent-hdfs  scala
shark  spark  spark-ec2  tachyon

If I look in /root/spark (where the sbin directory should be found),
it only contains a single 'conf' directory:

$ ls /root/spark
conf

Any idea why spark-ec2 might have failed to copy these files across?

Thanks,
Dave

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ensuring RDD indices remain immutable

2014-12-01 Thread rok
true though I was hoping to avoid having to sort... maybe there's no way
around it. Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ensuring-RDD-indices-remain-immutable-tp20094p20104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Yes, the processing causes the most stress. But this is parallizeable by 
splitting the input source. My problem is that once the heavy preprocessing is 
done, I’m in a „micro-update“ mode so to say (user-interactive part of the 
whole workflow). Then the map is rendered directly from the SQLite file by the 
map server instance on that machine – this is actually a favorable setup for me 
for resource consumption and implementation costs (I just need to tell the web 
ui to refresh after something was written to the db, and the map server will 
render the updates without me changing / coding anything). So my workflow 
requires to break out of parallel processing for some time.

Do you think for my my generalized workflow and tool chain can be like so?

 1.  Pre-Process many files in a parallel way. Gather all results, deploy them 
on one single machine. => Spark coalesce() + Crunch (for splitting input files 
into separate tasks)
 2.  On the machine where preprocessed results are on, configure a map server 
to connect to the local SQLite source. Do user-interactive micro-updates on 
that file (web UI gets updated).
 3.  Post-process the files in parallel. => Spark + Crunch
 4.  Design all of the above as a workflow, runnable (or assignable) as part of 
a user session. => Oozie

Do you think this is ok?

~Ben


Von: andy petrella mailto:andy.petre...@gmail.com>>
Datum: Montag, 1. Dezember 2014 15:48
An: Benjamin Stadin 
mailto:benjamin.sta...@heidelberg-mobil.com>>,
 "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Betreff: Re: Is Spark the right tool for me?

Indeed. However, I guess the important load and stress is in the processing of 
the 3D data (DEM or alike) into geometries/shades/whatever.
Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob for 
more info) to perform these operations then keep an RDD of only the resulting 
geometries.
Those geometries won't probably that heavy, hence it might be possible to 
coalesce(1, true) to have to whole thing on one node (or if your driver is more 
beefy, do a collect/foreach) to create the index.
You could also create a GeoJSON of the geometries and create the r-tree on it 
(not sure about this one).



On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin 
mailto:benjamin.sta...@heidelberg-mobil.com>>
 wrote:
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have 
many custom tools and steps, I’ll check our tools fit in. The end result after 
is actually a 3D map for native OpenGL based rendering on iOS / Android [1].

I’m using GeoPackage which is basically SQLite with R-Tree and a small library 
around it (more lightweight than SpatialLite). I want to avoid accessing the 
SQLite db from any other machine or task, that’s where I thought I can use a 
long running task which is the only process responsible to update a local-only 
stored SQLite db file. As you also said SQLite  (or mostly any other file based 
db) won’t work well over network. This isn’t only limited to R-Tree but 
expected limitation because of file locking issues as documented also by SQLite.

I also thought to do the same thing when rendering the (web) maps. In 
combination with the db handler which does the actual changes, I thought to run 
a map server instance on each node, configure it to add the database location 
as map source once the task starts.

Cheers
Ben

[1] http://www.deep-map.com

Von: andy petrella mailto:andy.petre...@gmail.com>>
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin 
mailto:benjamin.sta...@heidelberg-mobil.com>>,
 "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
GeoMesa?
Since you need a WMS (or alike), did you considered 
GeoTrellis (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)

kr,
andy



On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin 
mailto:benjamin.sta...@heidelberg-mobil.com>>
 wrote:
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neithe

Re: Spark SQL 1.0.0 - RDD from snappy compress avro file

2014-12-01 Thread cjdc
btw the same error from above also happen on 1.1.0 (just tested)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-0-0-RDD-from-snappy-compress-avro-file-tp19998p20106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Ritesh Kumar Singh
For converting an Array or any List to a RDD, we can try using :

>sc.parallelize(groupedScore)//or whatever the name of the list
variable is

On Mon, Dec 1, 2014 at 8:14 PM, Xuefeng Wu  wrote:

> Hi, I have a problem, it is easy in Scala code, but I can not take the top
> N from RDD as RDD.
>
>
> There are 1 Student Score, ask take top 10 age, and then take top 10
> from each age, the result is 100 records.
>
> The Scala code is here, but how can I do it in RDD,  *for RDD.take return
> is Array, but other RDD.*
>
> example Scala code:
>
> import scala.util.Random
>
> case class StudentScore(age: Int, num: Int, score: Int, name: Int)
>
> val scores = for {
>   i <- 1 to 1
> } yield {
>   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
> Random.nextInt())
> }
>
>
> def takeTop(scores: Seq[StudentScore], byKey: StudentScore => Int): Seq[(Int, 
> Seq[StudentScore])] = {
>   val groupedScore = scores.groupBy(byKey)
>.map{case (_, _scores) => 
> (_scores.foldLeft(0)((acc, v) => acc + v.score), _scores)}.toSeq
>   groupedScore.sortBy(_._1).take(10)
> }
>
> val topScores = for {
>   (_, ageScores) <- takeTop(scores, _.age)
>   (_, numScores) <- takeTop(ageScores, _.num)
> } yield {
>   numScores
> }
>
> topScores.size
>
>
> --
>
> ~Yours, Xuefeng Wu/吴雪峰  敬上
>
>


Re: Spark Job submit

2014-12-01 Thread Matt Narrell
Or setting the HADOOP_CONF_DIR property.  Either way, you must have the YARN 
configuration available to the submitting application to allow for the use of 
“yarn-client” or “yarn-master”

The attached stack trace below doesn’t provide any information as to why the 
job failed.

mn

> On Nov 27, 2014, at 12:14 AM, Akhil Das  wrote:
> 
> Try to add your cluster's core-site.xml, yarn-site.xml, and hdfs-site.xml to 
> the CLASSPATH (and on SPARK_CLASSPATH) and submit the job.
> 
> Thanks
> Best Regards
> 
> On Thu, Nov 27, 2014 at 12:24 PM, Naveen Kumar Pokala 
> mailto:npok...@spcapitaliq.com>> wrote:
> Code is in my windows machine and cluster is in some other network in UNIX. 
> In this case how it will identify the cluster. In case of spark cluster we 
> can clearly specify the URL like spark://ip:port. But in case of hadoop how 
> to specify that.
> 
>  
> 
> What I have done is copied the hadoop configuration files from network to 
> local and created dummy hadoop directory(in windows machine).
> 
>  
> 
> Submitted from spark submit by adding above dummy files location with 
> HADOOP_CONF_DIR variable.  Attaching the error.
> 
>  
> 
>  
> 
> 
> 
>  
> 
> Please suggest me how to proceed from the code and how to execute from spark 
> submit from windows machine.
> 
>  
> 
> Please provide me sample code if you have any.
> 
>  
> 
> -Naveen
> 
>  
> 
> From: Akhil Das [mailto:ak...@sigmoidanalytics.com 
> ] 
> Sent: Wednesday, November 26, 2014 10:03 PM
> To: Naveen Kumar Pokala
> Cc: user@spark.apache.org 
> Subject: Re: Spark Job submit
> 
>  
> 
> How about?
> 
>  
> 
> - Create a SparkContext 
> 
> - setMaster as yarn-cluster
> 
> - Create a JavaSparkContext with the above SparkContext
> 
>  
> 
> And that will submit it to the yarn cluster.
> 
> 
> 
> Thanks
> 
> Best Regards
> 
>  
> 
> On Wed, Nov 26, 2014 at 4:20 PM, Naveen Kumar Pokala  > wrote:
> 
> Hi.
> 
>  
> 
> Is there a way to submit spark job on Hadoop-YARN  cluster from java code.
> 
>  
> 
> -Naveen
> 
>  
> 
> 



Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread Bahubali Jain
Hi,
You can associate all the messages of a 3min interval with a unique key and
then group by and finally add up.

Thanks
On Dec 1, 2014 9:02 PM, "pankaj"  wrote:

> Hi,
>
> My incoming message has time stamp as one field and i have to perform
> aggregation over 3 minute of time slice.
>
> Message sample
>
> "Item ID" "Item Type" "timeStamp"
> 1  X   1-12-2014:12:01
> 1  X   1-12-2014:12:02
> 1  X   1-12-2014:12:03
> 1  y   1-12-2014:12:04
> 1  y   1-12-2014:12:05
> 1  y   1-12-2014:12:06
>
> Aggregation Result
> ItemIdItemType  count   aggregationStartTimeaggrEndTime
> 1  X 3  1-12-2014:12:01
>   1-12-2014:12:03
> 1  y  3   1-12-2014:12:04
>  1-12-2014:12:06
>
> What is the best way to perform time based aggregation in spark.
> Kindly suggest.
>
> Thanks
>
> --
> View this message in context: Time based aggregation in Real time Spark
> Streaming
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Mllib native netlib-java/OpenBLAS

2014-12-01 Thread agg212
Thanks for your reply, but I'm still running into issues
installing/configuring the native libraries for MLlib.  Here are the steps
I've taken, please let me know if anything is incorrect.

- Download Spark source
- unzip and compile using `mvn -Pnetlib-lgpl -DskipTests clean package `
- Run `sbt/sbt publish-local`

The last step fails with the following error (full stack trace is attached
here:  error.txt
 
):
[error] (sql/compile:compile) java.lang.AssertionError: assertion failed:
List(object package$DebugNode, object package$DebugNode)

Do I still have to install OPENBLAS/anything else if I build Spark from the
source using the -Pnetlib-lgpl flag?  Also, do I change the Spark version
(from 1.1.0 to 1.2.0-SNAPSHOT) in the .sbt file for my app?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-tp19662p20110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Time based aggregation in Real time Spark Streaming

2014-12-01 Thread pankaj
Hi ,

suppose i keep batch size of 3 minute. in 1 batch there can be incoming
records with any time stamp.
so it is difficult to keep track of when the 3 minute interval was start and
end. i am doing output operation on worker nodes in forEachPartition not in
drivers(forEachRdd) so i cannot use any shared variable to store start/end
time bcoz shared variable like accumulator are write only in task. 

is there any solution on this.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Time-based-aggregation-in-Real-time-Spark-Streaming-tp20102p20111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



packaging from source gives protobuf compatibility issues.

2014-12-01 Thread akhandeshi
scala> textFile.count()
java.lang.VerifyError: class
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CompleteReques
tProto overrides final method
getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;

I tried ./make-distribution.sh -Dhadoop.version=2.5.0 and
/usr/local/apache-maven-3.2.3/bin/mvn -Dhadoop.version=2.5.0 -DskipTests
clean package  both are giving same errors.  I am connecting to HDFS hosted
on hadoop version 2.5.0.

I will appreciate any help anyone can provide!
Thanks,

Ami



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/packaging-from-source-gives-protobuf-compatibility-issues-tp20112.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Debasish Das
rdd.top collects it on master...

If you want topk for a key run map / mappartition and use a bounded
priority queue and reducebykey the queues.

I experimented with topk from algebird and bounded priority queue wrapped
over jpriority queue ( spark default)...bpq is faster

Code example is here:

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066
On Dec 1, 2014 6:46 AM, "Xuefeng Wu"  wrote:

> Hi, I have a problem, it is easy in Scala code, but I can not take the top
> N from RDD as RDD.
>
>
> There are 1 Student Score, ask take top 10 age, and then take top 10
> from each age, the result is 100 records.
>
> The Scala code is here, but how can I do it in RDD,  *for RDD.take return
> is Array, but other RDD.*
>
> example Scala code:
>
> import scala.util.Random
>
> case class StudentScore(age: Int, num: Int, score: Int, name: Int)
>
> val scores = for {
>   i <- 1 to 1
> } yield {
>   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
> Random.nextInt())
> }
>
>
> def takeTop(scores: Seq[StudentScore], byKey: StudentScore => Int): Seq[(Int, 
> Seq[StudentScore])] = {
>   val groupedScore = scores.groupBy(byKey)
>.map{case (_, _scores) => 
> (_scores.foldLeft(0)((acc, v) => acc + v.score), _scores)}.toSeq
>   groupedScore.sortBy(_._1).take(10)
> }
>
> val topScores = for {
>   (_, ageScores) <- takeTop(scores, _.age)
>   (_, numScores) <- takeTop(ageScores, _.num)
> } yield {
>   numScores
> }
>
> topScores.size
>
>
> --
>
> ~Yours, Xuefeng Wu/吴雪峰  敬上
>
>


RE: Unable to compile spark 1.1.0 on windows 8.1

2014-12-01 Thread Judy Nash
Have you checked out the wiki here? 
http://spark.apache.org/docs/latest/building-with-maven.html

A couple things I did differently from you:
1) I got the bits directly from github (https://github.com/apache/spark/). Use 
branch 1.1 for spark 1.1
2) execute maven command on cmd (powershell misses libraries sometimes) 
3) Increase maven memory per suggested by building with maven wiki

Hope this helps. 

-Original Message-
From: Ishwardeep Singh [mailto:ishwardeep.si...@impetus.co.in] 
Sent: Monday, December 1, 2014 1:50 AM
To: u...@spark.incubator.apache.org
Subject: RE: Unable to compile spark 1.1.0 on windows 8.1

Hi Judy,

Thank you for your response.

When I try to compile using maven "mvn -Dhadoop.version=1.2.1 -DskipTests clean 
package" I get an error "Error: Could not find or load main class" . 
I have maven 3.0.4.

And when I run command "sbt package" I get the same exception as earlier.

I have done the following steps:

1. Download spark-1.1.0.tgz from the spark site and unzip the compressed zip to 
a folder "d:\myworkplace\software\spark-1.1.0"
2. Then I downloaded sbt-0.13.7.zip and extract it to folder 
"d:\myworkplace\software\sbt"
3. Update the PATH environment variable to include 
"d:\myworkplace\software\sbt\bin" in the PATH.
4. Navigate to spark folder d:\myworkplace\software\spark-1.1.0
5. Run the command "sbt assembly"
6. As a side effect of this command a number of libraries are downloaded and I 
get an initial error that path 
C:\Users\ishwardeep.singh\.sbt\0.13\staging\ec3aa8f39111944cc5f2\sbt-pom-reader
does not exist. 
7. I manually create this subfolder "ec3aa8f39111944cc5f2\sbt-pom-reader"
and retry to get the next error as described in my initial error.

Is this the correct procedure to compile spark 1.1.0? Please let me know.

Hoping to hear from you soon.

Regards,
ishwardeep



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-compile-spark-1-1-0-on-windows-8-1-tp19996p20075.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Summit East CFP - 5 days until deadline

2014-12-01 Thread Scott walent
The inaugural Spark Summit East (spark-summit.org/east), an event to bring
the Apache Spark community together, will be in New York City on March 18,
2015. The call for submissions is currently open, but will close this
Friday December 5, at 11:59pm PST.   The summit is looking for talks that
will cover topics including applications, development, research, and data
science.

At the Summit you can look forward to hearing from committers, developers,
CEOs, and companies who are solving real-world big data challenges with
Spark.

All submissions will be reviewed by a Program Committee that is made up of
the creators, top committers and individuals who have heavily contributed
to the Spark project. No speaker slots are being sold to sponsors in an
effort to to keep the Summit a community driven event.

To submit your abstracts please visit: spark-summit.org/east/2015/cfp

Looking forward to seeing you there!

Best,
Scott & The Spark Summit Organizers


How to Integrate openNLP with Spark

2014-12-01 Thread Nikhil
Hi,

I am using openNLP NER ( Token Name finder ) for parsing an Unstructured
data. In order to speed up my process( to quickly train a models and analyze
the documents from the models ), I want to use Spark and I saw on the web
that it is possible to connect openNLP with Spark using UIMAFit but I am not
sure how to do so. Though Philip Ogren has given a very nice presentation in
Spark Summit, still I am confusing.

Can someone please provide me end to end example on this. I am new in Spark
and UIMAFit, recently started working on it. 

Thanks

Nikhil Jain



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Integrate-openNLP-with-Spark-tp20117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Creating a SchemaRDD from an existing API

2014-12-01 Thread Michael Armbrust
No, it should support any data source that has a schema and can produce
rows.

On Mon, Dec 1, 2014 at 1:34 AM, Niranda Perera  wrote:

> Hi Michael,
>
> About this new data source API, what type of data sources would it
> support? Does it have to be RDBMS necessarily?
>
> Cheers
>
> On Sat, Nov 29, 2014 at 12:57 AM, Michael Armbrust  > wrote:
>
>> You probably don't need to create a new kind of SchemaRDD.  Instead I'd
>> suggest taking a look at the data sources API that we are adding in Spark
>> 1.2.  There is not a ton of documentation, but the test cases show how
>> to implement the various interfaces
>> ,
>> and there is an example library for reading Avro data
>> .
>>
>> On Thu, Nov 27, 2014 at 10:31 PM, Niranda Perera 
>> wrote:
>>
>>> Hi,
>>>
>>> I am evaluating Spark for an analytic component where we do batch
>>> processing of data using SQL.
>>>
>>> So, I am particularly interested in Spark SQL and in creating a SchemaRDD
>>> from an existing API [1].
>>>
>>> This API exposes elements in a database as datasources. Using the methods
>>> allowed by this data source, we can access and edit data.
>>>
>>> So, I want to create a custom SchemaRDD using the methods and provisions
>>> of
>>> this API. I tried going through Spark documentation and the Java Docs,
>>> but
>>> unfortunately, I was unable to come to a final conclusion if this was
>>> actually possible.
>>>
>>> I would like to ask the Spark Devs,
>>> 1. As of the current Spark release, can we make a custom SchemaRDD?
>>> 2. What is the extension point to a custom SchemaRDD? or are there
>>> particular interfaces?
>>> 3. Could you please point me the specific docs regarding this matter?
>>>
>>> Your help in this regard is highly appreciated.
>>>
>>> Cheers
>>>
>>> [1]
>>>
>>> https://github.com/wso2-dev/carbon-analytics/tree/master/components/xanalytics
>>>
>>> --
>>> *Niranda Perera*
>>> Software Engineer, WSO2 Inc.
>>> Mobile: +94-71-554-8430
>>> Twitter: @n1r44 
>>>
>>
>>
>
>
> --
> *Niranda Perera*
> Software Engineer, WSO2 Inc.
> Mobile: +94-71-554-8430
> Twitter: @n1r44 
>


Minimum cluster size for empirical testing

2014-12-01 Thread Valdes, Pablo
Hi everyone,

I’m interested in empirically measuring how faster spark works in comparison to 
Hadoop for certain problems and input corpus I currently work with (I’ve read 
Matei Zahari’s “Resilient Distributed Datasets: A Fault-Tolerant Abstraction 
for In-Memory Cluster Computing” paper and I want to perform a similar test). I 
personally think measuring the difference of speed in a single 1-node cluster 
isn’t enough, so I was wondering what would you recommend for this task, in 
regards of number of clusters/specs, etc.
I was thinking it could possible to launch a couple of CDH5 VMs across a few 
computers or do you think it would be easier to do it with Amazon EC2?

I’m particularly interested in knowing what is the overall experience in this 
case and what are your recommendations (what other common problems to test and 
what kind of benchmarks)

Have a great start of the week.
Cheers



Pablo Valdes Software Engineer | comScore, Inc. (NASDAQ:SCOR)

pval...@comscore.com



Av. Del Cóndor N° 520, oficina 202, Ciudad Empresarial, Comuna de Huechuraba, | 
Santiago | CL

...

comScore is a global leader in digital media analytics. We make audiences and 
advertising more valuable. To learn more, visit 
www.comscore.com




Re: How to use FlumeInputDStream in spark cluster?

2014-12-01 Thread Ping Tang
Thank you very much for your reply.

I have a cluster of 8 nodes: m1, m2, m3.. m8. m1 configured as Spark master 
node, the rest of the nodes are all worker node. I also configured m3 as the 
History Server. But the history server fails to start.I ran FlumeEventCount in 
m1 using the right hostname and a port that is not used by any application. 
Here is the script I used to run FlumeEventCount:


#!/bin/bash


spark-submit --verbose --class 
org.apache.spark.examples.streaming.FlumeEventCount --deploy-mode client 
--master yarn-client --jars 
lib/spark-streaming-flume_2.10-1.1.0-cdh5.2.2-20141112.193826-1.jar 
/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/spark/examples/lib/spark-examples-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar
 m1.ptang.aerohive.com 1234


Same issue observed after added  "spark.ui.port=4321" in  
/etc/spark/conf/spark-defaults.conf. Followings are the exceptions from the job 
run:


14/12/01 11:51:45 INFO JobScheduler: Finished job streaming job 1417463504000 
ms.0 from job set of time 1417463504000 ms

14/12/01 11:51:45 INFO JobScheduler: Total delay: 1.465 s for time 
1417463504000 ms (execution: 1.415 s)

14/12/01 11:51:45 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
Error starting receiver 0 - org.jboss.netty.channel.ChannelException: Failed to 
bind to: m1.ptang.aerohive.com/192.168.10.22:1234

at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

at org.apache.avro.ipc.NettyServer.(NettyServer.java:106)

at org.apache.avro.ipc.NettyServer.(NettyServer.java:119)

at org.apache.avro.ipc.NettyServer.(NettyServer.java:74)

at org.apache.avro.ipc.NettyServer.(NettyServer.java:68)

at 
org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)

at 
org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)

at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)

at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)

at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)

at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.net.BindException: Cannot assign requested address

at sun.nio.ch.Net.bind0(Native Method)

at sun.nio.ch.Net.bind(Net.java:444)

at sun.nio.ch.Net.bind(Net.java:436)

at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)

at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)

at 
org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)

at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)

at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)

at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)

... 3 more


14/12/01 11:51:45 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, 
m8.ptang.aerohive.com): org.jboss.netty.channel.ChannelException: Failed to 
bind to: m1.ptang.aerohive.com/192.168.10.22:1234

org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)

org.apache.avro.ipc.NettyServer.(NettyServer.java:106)

org.apache.avro.ipc.NettyServer.(NettyServer.java:119)

org.apache.avro.ipc.NettyServer.(NettyServer.java:74)

org.apache.avro.ipc.NettyServer.(NettyServer.java:68)


org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:164)


org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:171)


org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)


org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)


org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)


org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)


org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)


org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)

org.apache.spark.scheduler.Resu

Remove added jar from spark context

2014-12-01 Thread ankits
Hi,

Is there a way to REMOVE a jar (added via addJar) to spark contexts? We have
a long running context used by the spark jobserver, but after trying to
update versions of classes already in the class path via addJars, the
context still runs the old versions. It would be helpful if I could remove
the old jar from the context when adding the new one to prevent running
stale code.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Remove-added-jar-from-spark-context-tp20121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Inaccurate Estimate of weights model from StreamingLinearRegressionWithSGD

2014-12-01 Thread Bui, Tri
Thanks Yanbo!  That works!

The only issue is that it won’t print the predicted value from lp.features, 
from code line below.

model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

It prints the test input data correctly, but it keeps on printing “0.0” as the 
predicted values, which is the lp.features.

Thanks
Tri

From: Yanbo Liang [mailto:yanboha...@gmail.com]
Sent: Thursday, November 27, 2014 12:22 AM
To: Bui, Tri
Cc: user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

Hi Tri,

Maybe my latest responds for your problem is lost, whatever, the following code 
snippet can run correctly.

val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))

model.algorithm.setIntercept(true)

Because that all setXXX() function in StreamingLinearRegressionWithSGD will 
return this.type which is an instance of itself,
so we need set other configuration in a separate line w/o return value.

2014-11-27 1:04 GMT+08:00 Bui, Tri 
mailto:tri@verizonwireless.com.invalid>>:
Thanks Yanbo!

Modified code below:

val conf = new 
SparkConf().setMaster("local[2]").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt)).setNumIterations(args(4).toInt).setStepSize(.0001).algorithm.setIntercept(true)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()

But I am getting compile error:
[error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:54:
 value trainOn is not a member
of org.apache.spark.mllib.regression.LinearRegressionWithSGD
[error] model.trainOn(trainingData)
[error]   ^
[error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:55:
 value predictOnValues is not a
member of org.apache.spark.mllib.regression.LinearRegressionWithSGD
[error] model.predictOnValues(testData.map(lp => (lp.label, 
lp.features))).print()
[error]   ^
[error] two errors found
[error] (compile:compile) Compilation failed

Thanks
Tri

From: Yanbo Liang [mailto:yanboha...@gmail.com]
Sent: Tuesday, November 25, 2014 8:57 PM
To: Bui, Tri
Cc: user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

Hi Tri,

setIntercept() is not a member function of StreamingLinearRegressionWithSGD, 
it's a member function of LinearRegressionWithSGD(GeneralizedLinearAlgorithm) 
which is a member variable(named algorithm) of StreamingLinearRegressionWithSGD.

So you need to change your code to:
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
.algorithm.setIntercept(true)

Thanks
Yanbo


2014-11-25 23:51 GMT+08:00 Bui, Tri 
mailto:tri@verizonwireless.com.invalid>>:
Thanks Liang!

It was my bad, I fat finger one of the data point, correct it and the result 
match with yours.

I am still not able to get the intercept.  I am getting   [error] 
/data/project/LinearRegression/src/main/scala/StreamingLinearRegression.scala:47:
 value setIntercept
mber of org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

I try code below:
val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
model.setIntercept(addIntercept = true).trainOn(trainingData)

and:

val model = new 
StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(args(3).toInt))
.setIntercept(true)

But still get compilation error.

Thanks
Tri




From: Yanbo Liang [mailto:yanboha...@gmail.com]
Sent: Tuesday, November 25, 2014 4:08 AM
To: Bui, Tri
Cc: user@spark.apache.org
Subject: Re: Inaccurate Estimate of weights model from 
StreamingLinearRegressionWithSGD

The case run correctly in my environment.

14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Model 
updated at time 141690890 ms
14/11/25 17:48:20 INFO regression.StreamingLinearRegressionWithSGD: Current 
model: weights, [0.8588]

Can you provide more detail information if it is convenience?

Turn on the intercept value can be set as following:
val model = new StreamingLinearRegressionWithSGD()
  .algorithm.setIntercept(true)

2014-11-25 3:31 GMT+08:00 Bui, Tri 
mailto:tri@verizonwireless.com.invalid>>:
Hi,

I am getting incorrect weights model from StreamingLinearRegressionwith SGD.

One feature Input data is:

(1,[1])
(2,[2])
…
.
(20,[20])

The result from the Current model: weights is [-4.432]….which is not correct.

Also, how do 

StreamingLinearRegressionWithSGD

2014-12-01 Thread Joanne Contact
Hi Gurus,

I did not look at the code yet. I wonder if StreamingLinearRegressionWithSGD


is equivalent to
LinearRegressionWithSGD
with
starting weights of the current batch as the ending weights of the last
batch?

Since RidgeRegressionModel

does
not seem to have a streaming version, just wonder if this way will suffice.


Thanks!

J


Spark SQL table Join, one task is taking long

2014-12-01 Thread Venkat Subramanian
Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB
Ram each. Default serialization, Standalone, no security

Data was sqooped from relational DB to HDFS and Data is partitioned across
HDFS uniformly. I am reading a  fact table about 8 GB in size and one small
dim table from HDFS and then doing a join on them based on a criteria. .
Running the Driver on Spark shell on Spark master.

ContactDetail and DAgents are read as RDD and registered as table already.
Each of these tables have 60 to 90 fields and I am using Product class.

val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")

CDJoinQry.map(ta => ta(4)).count   // result is a small number

This works fine and returns the result fine. Hadoop mapPartition reads and
creation of RDDs are all fine But in the Count stage, I see that one of 
task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
and takes about 1.1 seconds to complete out of the 1.2 seconds of total
execution time. This task is usually around in the 3/4 th (say 160/200) of
the total tasks. At the time of that task running, one of the CPU in one
worker node goes to 100% for the duration of the task. Rest of the tasks
take few ms and does only < 5 MBs of Shuffle write.  I have run it
repeatedly and this happens regardless of which worker node this particular
task is running on. I turned on Spark debug on all nodes to understand, but
it was difficult to figure out where the delay is from the logs. There are
no errors or re-trys in the logs. 

Not sure if I can post logs here for someone to look at, if so I can (about
10 Mb). Also, not sure if this normal in such a table join that one task
would take most amount of time. Let me know if you have any suggestions.

Regards,

Venkat




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



hdfs streaming context

2014-12-01 Thread Benjamin Cuthbert
All,

Is it possible to stream on HDFS directory and listen for multiple files?

I have tried the following

val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val lines = ssc.textFileStream("hdfs://localhost:8020/user/data/*")
lines.filter(line => line.contains("GE"))
lines.print()
ssc.start()

But I get

14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
1417469742000 ms
java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
Try 

("hdfs:///localhost:8020/user/data/*") 

With 3 "/".

Thx
tri

-Original Message-
From: Benjamin Cuthbert [mailto:cuthbert@gmail.com] 
Sent: Monday, December 01, 2014 4:41 PM
To: user@spark.apache.org
Subject: hdfs streaming context

All,

Is it possible to stream on HDFS directory and listen for multiple files?

I have tried the following

val sparkConf = new SparkConf().setAppName("HdfsWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = 
ssc.textFileStream("hdfs://localhost:8020/user/data/*")
lines.filter(line => line.contains("GE"))
lines.print()
ssc.start()

But I get

14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
1417469742000 ms
java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
exist.
at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Andy Twigg
Have you tried just passing a path to ssc.textFileStream() ? It
monitors the path for new files by looking at mtime/atime ; all
new/touched files in the time window appear as an rdd in the dstream.

On 1 December 2014 at 14:41, Benjamin Cuthbert  wrote:
> All,
>
> Is it possible to stream on HDFS directory and listen for multiple files?
>
> I have tried the following
>
> val sparkConf = new SparkConf().setAppName("HdfsWordCount")
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> val lines = ssc.textFileStream("hdfs://localhost:8020/user/data/*")
> lines.filter(line => line.contains("GE"))
> lines.print()
> ssc.start()
>
> But I get
>
> 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
> 1417469742000 ms
> java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
> exist.
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Sean Owen
Yes, in fact, that's the only way it works. You need
"hdfs://localhost:8020/user/data", I believe.

(No it's not correct to write "hdfs:///...")

On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert
 wrote:
> All,
>
> Is it possible to stream on HDFS directory and listen for multiple files?
>
> I have tried the following
>
> val sparkConf = new SparkConf().setAppName("HdfsWordCount")
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> val lines = ssc.textFileStream("hdfs://localhost:8020/user/data/*")
> lines.filter(line => line.contains("GE"))
> lines.print()
> ssc.start()
>
> But I get
>
> 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
> 1417469742000 ms
> java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
> exist.
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Benjamin Cuthbert
Thanks Sean,

That worked just removing the /* and leaving it as /user/data

Seems to be streaming in.


> On 1 Dec 2014, at 22:50, Sean Owen  wrote:
> 
> Yes, in fact, that's the only way it works. You need
> "hdfs://localhost:8020/user/data", I believe.
> 
> (No it's not correct to write "hdfs:///...")
> 
> On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert
>  wrote:
>> All,
>> 
>> Is it possible to stream on HDFS directory and listen for multiple files?
>> 
>> I have tried the following
>> 
>> val sparkConf = new SparkConf().setAppName("HdfsWordCount")
>> val ssc = new StreamingContext(sparkConf, Seconds(2))
>> val lines = ssc.textFileStream("hdfs://localhost:8020/user/data/*")
>> lines.filter(line => line.contains("GE"))
>> lines.print()
>> ssc.start()
>> 
>> But I get
>> 
>> 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
>> 1417469742000 ms
>> java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does 
>> not exist.
>>at 
>> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
>>at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
>>at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
>>at 
>> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
>>at 
>> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:75)
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
For the streaming example I am working on, Its accepted ("hdfs:///user/data") 
without the localhost info.  

Let me dig through my hdfs config.





-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, December 01, 2014 4:50 PM
To: Benjamin Cuthbert
Cc: user@spark.apache.org
Subject: Re: hdfs streaming context

Yes, in fact, that's the only way it works. You need 
"hdfs://localhost:8020/user/data", I believe.

(No it's not correct to write "hdfs:///...")

On Mon, Dec 1, 2014 at 10:41 PM, Benjamin Cuthbert  
wrote:
> All,
>
> Is it possible to stream on HDFS directory and listen for multiple files?
>
> I have tried the following
>
> val sparkConf = new SparkConf().setAppName("HdfsWordCount")
> val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = 
> ssc.textFileStream("hdfs://localhost:8020/user/data/*")
> lines.filter(line => line.contains("GE"))
> lines.print()
> ssc.start()
>
> But I get
>
> 14/12/01 21:35:42 ERROR JobScheduler: Error generating jobs for time 
> 1417469742000 ms
> java.io.FileNotFoundException: File hdfs://localhost:8020/user/data/*does not 
> exist.
> at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:408)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1416)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1456)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:107)
> at 
> org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputD
> Stream.scala:75)
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



Re: hdfs streaming context

2014-12-01 Thread Sean Owen
Yes but you can't follow three slashes with host:port. No host
probably defaults to whatever is found in your HDFS config.

On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri  wrote:
> For the streaming example I am working on, Its accepted ("hdfs:///user/data") 
> without the localhost info.
>
> Let me dig through my hdfs config.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: hdfs streaming context

2014-12-01 Thread Bui, Tri
Yep. No localhost

Usually, I use hdfs:///user/data to indicates I want hdfs  or file:///user/data 
to indicates local file directory.  



-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Monday, December 01, 2014 5:06 PM
To: Bui, Tri
Cc: Benjamin Cuthbert; user@spark.apache.org
Subject: Re: hdfs streaming context

Yes but you can't follow three slashes with host:port. No host probably 
defaults to whatever is found in your HDFS config.

On Mon, Dec 1, 2014 at 11:02 PM, Bui, Tri  wrote:
> For the streaming example I am working on, Its accepted ("hdfs:///user/data") 
> without the localhost info.
>
> Let me dig through my hdfs config.


Re: How take top N of top M from RDD as RDD

2014-12-01 Thread Xuefeng Wu
hi Debasish,

I found test code in map translate, 
would it collect all products too?

+ val sortedProducts = products.toArray.sorted(ord.reverse)


Yours, Xuefeng Wu 吴雪峰 敬上

> On 2014年12月2日, at 上午1:33, Debasish Das  wrote:
> 
> rdd.top collects it on master...
> 
> If you want topk for a key run map / mappartition and use a bounded priority 
> queue and reducebykey the queues.
> 
> I experimented with topk from algebird and bounded priority queue wrapped 
> over jpriority queue ( spark default)...bpq is faster
> 
> Code example is here:
> 
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-3066
> 
>> On Dec 1, 2014 6:46 AM, "Xuefeng Wu"  wrote:
>> Hi, I have a problem, it is easy in Scala code, but I can not take the top N 
>> from RDD as RDD.
>> 
>> 
>> There are 1 Student Score, ask take top 10 age, and then take top 10 
>> from each age, the result is 100 records.
>>  
>> The Scala code is here, but how can I do it in RDD,  for RDD.take return is 
>> Array, but other RDD.
>> 
>> example Scala code:
>> import scala.util.Random
>> 
>> case class StudentScore(age: Int, num: Int, score: Int, name: Int)
>> 
>> val scores = for {
>>   i <- 1 to 1
>> } yield {
>>   StudentScore(Random.nextInt(100), Random.nextInt(100), Random.nextInt(), 
>> Random.nextInt())
>> }
>> 
>> 
>> def takeTop(scores: Seq[StudentScore], byKey: StudentScore => Int): 
>> Seq[(Int, Seq[StudentScore])] = {
>>   val groupedScore = scores.groupBy(byKey)
>>.map{case (_, _scores) => 
>> (_scores.foldLeft(0)((acc, v) => acc + v.score), _scores)}.toSeq
>>   groupedScore.sortBy(_._1).take(10)
>> }
>> 
>> val topScores = for {
>>   (_, ageScores) <- takeTop(scores, _.age)
>>   (_, numScores) <- takeTop(ageScores, _.num)
>> } yield {
>>   numScores
>> }
>> 
>> topScores.size
>> 
>> -- 
>> 
>> ~Yours, Xuefeng Wu/吴雪峰  敬上


Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
This is a long shot, but...

I'm trying to load a bunch of files spread out over hdfs into an RDD, and
in most cases it works well, but for a few very large files, I exceed
available memory.  My current workflow basically works like this:

context.parallelize(fileNames).flatMap { file =>
  tranform file into a bunch of records
}

I'm wondering if there are any APIs to somehow "flush" the records of a big
dataset so I don't have to load them all into memory at once.  I know this
doesn't exist, but conceptually:

context.parallelize(fileNames).streamMap { (file, stream) =>
 for every 10K records write records to stream and flush
}

Keith


Passing Java Options to Spark AM launching

2014-12-01 Thread Mohammad Islam
Hi,How to pass the Java options (such as "-XX:MaxMetaspaceSize=100M") when 
lunching AM or task containers?
This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, 
setting the property such as 
"mapreduce.map.java.opts" would do the work.
Any help would be highly appreciated.
Regards,Mohammad



 

Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
>
> file => tranform file into a bunch of records


What does this function do exactly? Does it load the file locally?
Spark supports RDDs exceeding global RAM (cf the terasort example), but if
your example just loads each file locally, then this may cause problems.
Instead, you should load each file into an rdd with context.textFile(),
flatmap that and union these rdds.

also see
http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


On 1 December 2014 at 16:50, Keith Simmons  wrote:

> This is a long shot, but...
>
> I'm trying to load a bunch of files spread out over hdfs into an RDD, and
> in most cases it works well, but for a few very large files, I exceed
> available memory.  My current workflow basically works like this:
>
> context.parallelize(fileNames).flatMap { file =>
>   tranform file into a bunch of records
> }
>
> I'm wondering if there are any APIs to somehow "flush" the records of a
> big dataset so I don't have to load them all into memory at once.  I know
> this doesn't exist, but conceptually:
>
> context.parallelize(fileNames).streamMap { (file, stream) =>
>  for every 10K records write records to stream and flush
> }
>
> Keith
>


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Actually, I'm working with a binary format.  The api allows reading out a
single record at a time, but I'm not sure how to get those records into
spark (without reading everything into memory from a single file at once).



On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:

> file => tranform file into a bunch of records
>
>
> What does this function do exactly? Does it load the file locally?
> Spark supports RDDs exceeding global RAM (cf the terasort example), but if
> your example just loads each file locally, then this may cause problems.
> Instead, you should load each file into an rdd with context.textFile(),
> flatmap that and union these rdds.
>
> also see
>
> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>
>
> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>
>> This is a long shot, but...
>>
>> I'm trying to load a bunch of files spread out over hdfs into an RDD, and
>> in most cases it works well, but for a few very large files, I exceed
>> available memory.  My current workflow basically works like this:
>>
>> context.parallelize(fileNames).flatMap { file =>
>>   tranform file into a bunch of records
>> }
>>
>> I'm wondering if there are any APIs to somehow "flush" the records of a
>> big dataset so I don't have to load them all into memory at once.  I know
>> this doesn't exist, but conceptually:
>>
>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>  for every 10K records write records to stream and flush
>> }
>>
>> Keith
>>
>
>


Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Tobias Pfeiffer
Hi,

have a look at the documentation for spark.driver.extraJavaOptions (which
seems to have disappeared since I looked it up last week)
and spark.executor.extraJavaOptions at <
http://spark.apache.org/docs/latest/configuration.html#runtime-environment>.

Tobias


Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Mohammad Islam
Thanks Tobias for the answer.Does it work for "driver" as well?
Regards,Mohammad 

 On Monday, December 1, 2014 5:30 PM, Tobias Pfeiffer  
wrote:
   

 Hi,
have a look at the documentation for spark.driver.extraJavaOptions (which seems 
to have disappeared since I looked it up last week) and 
spark.executor.extraJavaOptions at 
.
Tobias



   

Re: Passing Java Options to Spark AM launching

2014-12-01 Thread Zhan Zhang
Please check whether 
https://github.com/apache/spark/pull/3409#issuecomment-64045677 solve the 
problem for launching AM.

Thanks.

Zhan Zhang
On Dec 1, 2014, at 4:49 PM, Mohammad Islam  wrote:

> Hi,
> How to pass the Java options (such as "-XX:MaxMetaspaceSize=100M") when 
> lunching AM or task containers?
> 
> This is related to running Spark on Yarn (Hadoop 2.3.0). In Map-reduce case, 
> setting the property such as 
> "mapreduce.map.java.opts" would do the work.
> 
> Any help would be highly appreciated.
> 
> Regards,
> Mohammad
> 
> 
> 
> 
>  


-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
Could you modify your function so that it streams through the files record
by record and outputs them to hdfs, then read them all in as RDDs and take
the union? That would only use bounded memory.

On 1 December 2014 at 17:19, Keith Simmons  wrote:

> Actually, I'm working with a binary format.  The api allows reading out a
> single record at a time, but I'm not sure how to get those records into
> spark (without reading everything into memory from a single file at once).
>
>
>
> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>
>> file => tranform file into a bunch of records
>>
>>
>> What does this function do exactly? Does it load the file locally?
>> Spark supports RDDs exceeding global RAM (cf the terasort example), but
>> if your example just loads each file locally, then this may cause problems.
>> Instead, you should load each file into an rdd with context.textFile(),
>> flatmap that and union these rdds.
>>
>> also see
>>
>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>>
>>
>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>>
>>> This is a long shot, but...
>>>
>>> I'm trying to load a bunch of files spread out over hdfs into an RDD,
>>> and in most cases it works well, but for a few very large files, I exceed
>>> available memory.  My current workflow basically works like this:
>>>
>>> context.parallelize(fileNames).flatMap { file =>
>>>   tranform file into a bunch of records
>>> }
>>>
>>> I'm wondering if there are any APIs to somehow "flush" the records of a
>>> big dataset so I don't have to load them all into memory at once.  I know
>>> this doesn't exist, but conceptually:
>>>
>>> context.parallelize(fileNames).streamMap { (file, stream) =>
>>>  for every 10K records write records to stream and flush
>>> }
>>>
>>> Keith
>>>
>>
>>
>


numpy arrays and spark sql

2014-12-01 Thread Joseph Winston
This works as expected in the 1.1 branch: 

from pyspark.sql import *

rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]

# define the schema
schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10"
fields = [StructField(field_name, IntegerType(), True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

# Register the table
schemaRDD.registerTempTable("slice")

# SQL can be run over SchemaRDDs that have been registered as a table.
results = sqlContext.sql("SELECT value1 FROM slice")

# The results of SQL queries are RDDs and support all the normal RDD operations.
print results.collect()

However changing the rdd to use a numpy array fails:

import np as np
rdd = sc.parallelize(np.arange(20).reshape(2, 10))

# define the schema
schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 value9 
value10"
fields = [StructField(field_name, np.ndarray, True) for field_name in 
schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaRDD = sqlContext.applySchema(rdd, schema)

The error is:
Traceback (most recent call last):
  File "", line 2, in 
  File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 1119, in 
applySchema
_verify_type(row, schema)
  File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 735, in 
_verify_type
% (dataType, type(obj)))
TypeError: StructType(List(StructField(value1,,true),StructField(value2,,true),StructField(value3,,true),StructField(value4,,true),StructField(value5,,true),StructField(value6,,true),StructField(value7,,true),StructField(value8,,true),StructField(value9,,true),StructField(value10,,true))) can 
not accept abject in type 

I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
make a numpy arrays work?
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Yep, that's definitely possible.  It's one of the workarounds I was
considering.  I was just curious if there was a simpler (and perhaps more
efficient) approach.

Keith

On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg  wrote:

> Could you modify your function so that it streams through the files record
> by record and outputs them to hdfs, then read them all in as RDDs and take
> the union? That would only use bounded memory.
>
> On 1 December 2014 at 17:19, Keith Simmons  wrote:
>
>> Actually, I'm working with a binary format.  The api allows reading out a
>> single record at a time, but I'm not sure how to get those records into
>> spark (without reading everything into memory from a single file at once).
>>
>>
>>
>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>>
>>> file => tranform file into a bunch of records
>>>
>>>
>>> What does this function do exactly? Does it load the file locally?
>>> Spark supports RDDs exceeding global RAM (cf the terasort example), but
>>> if your example just loads each file locally, then this may cause problems.
>>> Instead, you should load each file into an rdd with context.textFile(),
>>> flatmap that and union these rdds.
>>>
>>> also see
>>>
>>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files
>>>
>>>
>>> On 1 December 2014 at 16:50, Keith Simmons  wrote:
>>>
 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =>
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow "flush" the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =>
  for every 10K records write records to stream and flush
 }

 Keith

>>>
>>>
>>
>


Re: ALS failure with size > Integer.MAX_VALUE

2014-12-01 Thread Bharath Ravi Kumar
Yes, the issue appears to be due to the 2GB block size limitation. I am
hence looking for (user, product) block sizing suggestions to work around
the block size limitation.

On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen  wrote:

> (It won't be that, since you see that the error occur when reading a
> block from disk. I think this is an instance of the 2GB block size
> limitation.)
>
> On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya
>  wrote:
> > Hi Bharath – I’m unsure if this is your problem but the
> > MatrixFactorizationModel in MLLIB which is the underlying component for
> ALS
> > expects your User/Product fields to be integers. Specifically, the input
> to
> > ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
> wondering if
> > perhaps one of your identifiers exceeds MAX_INT, could you write a quick
> > check for that?
> >
> > I have been running a very similar use case to yours (with more
> constrained
> > hardware resources) and I haven’t seen this exact problem but I’m sure
> we’ve
> > seen similar issues. Please let me know if you have other questions.
> >
> > From: Bharath Ravi Kumar 
> > Date: Thursday, November 27, 2014 at 1:30 PM
> > To: "user@spark.apache.org" 
> > Subject: ALS failure with size > Integer.MAX_VALUE
> >
> > We're training a recommender with ALS in mllib 1.1 against a dataset of
> 150M
> > users and 4.5K items, with the total number of training records being 1.2
> > Billion (~30GB data). The input data is spread across 1200 partitions on
> > HDFS. For the training, rank=10, and we've configured {number of user
> data
> > blocks = number of item data blocks}. The number of user/item blocks was
> > varied  between 50 to 1200. Irrespective of the block size (e.g. at 1200
> > blocks each), there are atleast a couple of tasks that end up shuffle
> > reading > 9.7G each in the aggregate stage (ALS.scala:337) and failing
> with
> > the following exception:
> >
> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
> > at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
> > at
> org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
> > at
> >
> org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
> > at
> >
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
> >
>


Re: Calling spark from a java web application.

2014-12-01 Thread ryaminal
If you are able to use YARN in your hadoop cluster, then the following
technique is pretty straightforward:
http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/

We use this in our system and it's super easy to execute from our Tomcat
application.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007p20145.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Andy Twigg
You may be able to construct RDDs directly from an iterator - not sure
- you may have to subclass your own.

On 1 December 2014 at 18:40, Keith Simmons  wrote:
> Yep, that's definitely possible.  It's one of the workarounds I was
> considering.  I was just curious if there was a simpler (and perhaps more
> efficient) approach.
>
> Keith
>
> On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg  wrote:
>>
>> Could you modify your function so that it streams through the files record
>> by record and outputs them to hdfs, then read them all in as RDDs and take
>> the union? That would only use bounded memory.
>>
>> On 1 December 2014 at 17:19, Keith Simmons  wrote:
>>>
>>> Actually, I'm working with a binary format.  The api allows reading out a
>>> single record at a time, but I'm not sure how to get those records into
>>> spark (without reading everything into memory from a single file at once).
>>>
>>>
>>>
>>> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg  wrote:
>
> file => tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons  wrote:
>
> This is a long shot, but...
>
> I'm trying to load a bunch of files spread out over hdfs into an RDD,
> and in most cases it works well, but for a few very large files, I exceed
> available memory.  My current workflow basically works like this:
>
> context.parallelize(fileNames).flatMap { file =>
>   tranform file into a bunch of records
> }
>
> I'm wondering if there are any APIs to somehow "flush" the records of a
> big dataset so I don't have to load them all into memory at once.  I know
> this doesn't exist, but conceptually:
>
> context.parallelize(fileNames).streamMap { (file, stream) =>
>  for every 10K records write records to stream and flush
> }
>
> Keith


>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: numpy arrays and spark sql

2014-12-01 Thread Davies Liu
applySchema() only accept RDD of Row/list/tuple, it does not work with
numpy.array.

After applySchema(), the Python RDD will be pickled and unpickled in
JVM, so you will not have any benefit by using numpy.array.

It will work if you convert ndarray into list:

schemaRDD = sqlContext.applySchema(rdd.map(list), schema)

On Mon, Dec 1, 2014 at 6:33 PM, Joseph Winston  wrote:
> This works as expected in the 1.1 branch:
>
> from pyspark.sql import *
>
> rdd = sc.parallelize([range(0, 10), range(10,20), range(20, 30)]
>
> # define the schema
> schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 
> value9 value10"
> fields = [StructField(field_name, IntegerType(), True) for field_name in 
> schemaString.split()]
> schema = StructType(fields)
>
> # Apply the schema to the RDD.
> schemaRDD = sqlContext.applySchema(rdd, schema)
>
> # Register the table
> schemaRDD.registerTempTable("slice")
>
> # SQL can be run over SchemaRDDs that have been registered as a table.
> results = sqlContext.sql("SELECT value1 FROM slice")
>
> # The results of SQL queries are RDDs and support all the normal RDD 
> operations.
> print results.collect()
>
> However changing the rdd to use a numpy array fails:
>
> import np as np
> rdd = sc.parallelize(np.arange(20).reshape(2, 10))
>
> # define the schema
> schemaString = "value1 value2 value3 value4 value5 value6 value7 value8 
> value9 value10"
> fields = [StructField(field_name, np.ndarray, True) for field_name in 
> schemaString.split()]
> schema = StructType(fields)
>
> # Apply the schema to the RDD.
> schemaRDD = sqlContext.applySchema(rdd, schema)
>
> The error is:
> Traceback (most recent call last):
>   File "", line 2, in 
>   File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 1119, in 
> applySchema
> _verify_type(row, schema)
>   File "/Users/jbw/src/Remote/GIT/spark/python/pyspark/sql.py", line 735, in 
> _verify_type
> % (dataType, type(obj)))
> TypeError: StructType(List(StructField(value1, 'numpy.ndarray'>,true),StructField(value2, 'numpy.ndarray'>,true),StructField(value3, 'numpy.ndarray'>,true),StructField(value4, 'numpy.ndarray'>,true),StructField(value5, 'numpy.ndarray'>,true),StructField(value6, 'numpy.ndarray'>,true),StructField(value7, 'numpy.ndarray'>,true),StructField(value8, 'numpy.ndarray'>,true),StructField(value9, 'numpy.ndarray'>,true),StructField(value10,,true))) can 
> not accept abject in type 
>
> I’ve tried np.int_ and np.int32 and they fail too.  What type should I use to 
> make a numpy arrays work?
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.io.IOException: Filesystem closed

2014-12-01 Thread rapelly kartheek
Hi,

I face the following exception when submit a spark application. The log
file shows:

14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener
threw an exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
at
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668)
at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629)
at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614)
at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120)
at
org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
at
org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158)
at
org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87)
at
org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
at
org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
at
org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52)
at
org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)

Someone please help me resolve this!!

Thanks


Re: java.io.IOException: Filesystem closed

2014-12-01 Thread Akhil Das
What is the application that you are submitting? Looks like you might have
invoked fs inside the app and then closed it within it.

Thanks
Best Regards

On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek 
wrote:

> Hi,
>
> I face the following exception when submit a spark application. The log
> file shows:
>
> 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener
> threw an exception
> java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668)
> at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629)
> at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614)
> at
> org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120)
> at
> org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
> at
> org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158)
> at
> org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87)
> at
> org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112)
> at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
> at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52)
> at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81)
> at
> org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79)
> at
> org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52)
> at
> org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
> at
> org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)
>
> Someone please help me resolve this!!
>
> Thanks
>
>