Insert a JavaPairDStream into multiple cassandra table on the basis of key.

2016-11-02 Thread Abhishek Anand
Hi All,

I have a JavaPairDStream. I want to insert this dstream into multiple
cassandra tables on the basis of key. One approach is to filter each key
and then insert it into cassandra table. But this would call filter
operation "n" times depending on the number of keys.

Is there any better approach to achieve this more quickly ?

Thanks
Abhi


distribute partitions evenly to my cluster

2016-11-02 Thread heather79
Hi, I have a cluster with 4 nodes (12 cores/ node). I want to distribute my
dataset to 24 partitions and allocate 6 partitions/ node. However, i found i
got 12 partitions with 2 nodes and 0 partition with the other 2 nodes.
Anyone has idea of how to  set 6 partitions/node? is that possible to do
that?
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/distribute-partitions-evenly-to-my-cluster-tp27998.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: mapwithstate Hangs with Error cleaning broadcast

2016-11-02 Thread manasdebashiskar
Yes, 
 In my case, my StateSpec had a small partition size. I increased the
numPartitions and the problem went away. (Details of why the problem was
happening in the first place is elided.)

 TL;DR
 StateSpec takes a "numPartitions" which can be set to high enough number.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mapwithstate-Hangs-with-Error-cleaning-broadcast-tp26500p27994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RuntimeException: Null value appeared in non-nullable field when holding Optional Case Class

2016-11-02 Thread Aniket Bhatnagar
Hi all

I am running into a runtime exception when a DataSet is holding an Empty
object instance for an Option type that is holding non-nullable field. For
instance, if we have the following case class:

case class DataRow(id: Int, value: String)

Then, DataSet[Option[DataRow]] can only hold Some(DataRow) objects and
cannot hold Empty. If it does so, the following exception is thrown:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure:
Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException:
Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "id")
- option value class:
"com.aol.advertising.dmp.audscale.uts.DataSetOptBug.DataRow"
- root class: "scala.Option"
If the schema is inferred from a Scala tuple/case class, or a Java bean,
please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).


I am attaching a sample program to reproduce this. Is this a known
limitation or a bug?

Thanks,
Aniket

Full stack trace:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 6 in stage 0.0 failed 1 times, most recent failure:
Lost task 6.0 in stage 0.0 (TID 6, localhost): java.lang.RuntimeException:
Null value appeared in non-nullable field:
- field (class: "scala.Int", name: "id")
- option value class: "DataSetOptBug.DataRow"
- root class: "scala.Option"
If the schema is inferred from a Scala tuple/case class, or a Java bean,
please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


DataSetOptBug.scala
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Michael Segel

On Nov 2, 2016, at 2:22 PM, Daniel Siegmann 
mailto:dsiegm...@securityscorecard.io>> wrote:

Yes, it needs to be on a single line. Spark (or Hadoop really) treats newlines 
as a record separator by default. While it is possible to use a different 
string as a record separator, what would you use in the case of JSON?

If you do some Googling I suspect you'll find some possible solutions. 
Personally, I would just use a separate JSON library (e.g. json4s) to parse 
this metadata into an object, rather than trying to read it in through Spark.


Yeah, that’s the basic idea.

This JSON is metadata to help drive the process not row records… although the 
column descriptors are row records so in the short term I could cheat and just 
store those in a file.

:-(

--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.
214 W 29th Street, 5th Floor
New York, NY 10001



Re: Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Daniel Siegmann
Yes, it needs to be on a single line. Spark (or Hadoop really) treats
newlines as a record separator by default. While it is possible to use a
different string as a record separator, what would you use in the case of
JSON?

If you do some Googling I suspect you'll find some possible solutions.
Personally, I would just use a separate JSON library (e.g. json4s) to parse
this metadata into an object, rather than trying to read it in through
Spark.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


BiMap BroadCast Variable - Kryo Serialization Issue

2016-11-02 Thread Kalpana Jalawadi
Hi,

I am getting Nullpointer exception due to Kryo Serialization issue, while
trying to read a BiMap broadcast variable. Attached is the code snippets.
Pointers shared here didn't help - link1
,
link2
.
Spark version used is 1.6.x, but this was working with 1.3.x version.

Any help in this regard is much appreciated.

Exception:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238)
App > at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
App > at
com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39)
App > at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
App > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
App > at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
App > at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
App > at scala.collection.AbstractIterator.to(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
App > at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
App > at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
App > at java.lang.Thread.run(Thread.java:745)
App > Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
App > at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
App > at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
App > at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
App > ... 29 more
App > Caused by: java.lang.NullPointerException
App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
App > ... 35 more
App >
App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager:
Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task
17.1 in stage 1.0 (TID 19) on executor ip-10-0-1-237.ec2.internal:
java.io.IOException (com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > val

Re: Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Michael Segel
ARGH!!

Looks like a formatting issue.  Spark doesn’t like ‘pretty’ output.

So then the entire record which defines the schema has to be a single line?

Really?

On Nov 2, 2016, at 1:50 PM, Michael Segel 
mailto:msegel_had...@hotmail.com>> wrote:

This may be a silly mistake on my part…

Doing an example using Chicago’s Crime data.. (There’s a lot of it going 
around. ;-)

The goal is to read a file containing a JSON record that describes the crime 
data.csv for ingestion into a data frame, then I want to output to a Parquet 
file.
(Pretty simple right?)

I ran this both in Zeppelin and in the Spark-Shell (2.01)

// Setup of environment
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic 
example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

// Load the JSON from file:
val df = spark.read.json(“~/datasets/Chicago_Crimes.json")
df.show()


The output
df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
++
| _corrupt_record|
++
| {|
| "metadata": {|
| "source": "CSV_...|
| "table": "Chica...|
| "compression": ...|
| },|
| "columns": [{|
| "col_name": "Id",|
| "data_type": "I...|
| }, {|
| "col_name": "Ca...|
| "data_type": "B...|
| }, {|

I checked the JSON file against a JSONLint tool (two actually)
My JSON record is valid w no errors. (see below)

So what’s happening?  What am I missing?
The goal is to create an ingestion schema for each source. From this I can 
build the schema for the Parquet file or other data target.

Thx

-Mike

My JSON record:
{
"metadata": {
"source": "CSV_FILE",
"table": "Chicago_Crime",
"compression": "SNAPPY"
},
"columns": [{
"col_name": "Id",
"data_type": "INT64"
}, {
"col_name": "Case_No.",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Date",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Block",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "IUCR",
"data_type": "INT32"
}, {
"col_name": "Primary_Type",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Description",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Location_Description",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Arrest",
"data_type": "BOOLEAN"
}, {
"col_name": "Domestic",
"data_type": "BOOLEAN"
}, {
"col_name": "Beat",
"data_type": "BYTE_ARRAYI"
}, {
"col_name": "District",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Ward",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Community",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "FBI_Code",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "X_Coordinate",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Y_Coordinate",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Year",
"data_type": "INT32"
}, {
"col_name": "Updated_On",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Latitude",
"data_type": "DOUBLE"
}, {
"col_name": "Longitude",
"data_type": "DOUBLE"
}, {
"col_name": "Location",
"data_type": "BYTE_ARRAY"


}]
}



Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Michael Segel
This may be a silly mistake on my part…

Doing an example using Chicago’s Crime data.. (There’s a lot of it going 
around. ;-)

The goal is to read a file containing a JSON record that describes the crime 
data.csv for ingestion into a data frame, then I want to output to a Parquet 
file.
(Pretty simple right?)

I ran this both in Zeppelin and in the Spark-Shell (2.01)

// Setup of environment
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic 
example").config("spark.some.config.option", "some-value").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

// Load the JSON from file:
val df = spark.read.json(“~/datasets/Chicago_Crimes.json")
df.show()


The output
df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]
++
| _corrupt_record|
++
| {|
| "metadata": {|
| "source": "CSV_...|
| "table": "Chica...|
| "compression": ...|
| },|
| "columns": [{|
| "col_name": "Id",|
| "data_type": "I...|
| }, {|
| "col_name": "Ca...|
| "data_type": "B...|
| }, {|

I checked the JSON file against a JSONLint tool (two actually)
My JSON record is valid w no errors. (see below)

So what’s happening?  What am I missing?
The goal is to create an ingestion schema for each source. From this I can 
build the schema for the Parquet file or other data target.

Thx

-Mike

My JSON record:
{
"metadata": {
"source": "CSV_FILE",
"table": "Chicago_Crime",
"compression": "SNAPPY"
},
"columns": [{
"col_name": "Id",
"data_type": "INT64"
}, {
"col_name": "Case_No.",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Date",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Block",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "IUCR",
"data_type": "INT32"
}, {
"col_name": "Primary_Type",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Description",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Location_Description",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Arrest",
"data_type": "BOOLEAN"
}, {
"col_name": "Domestic",
"data_type": "BOOLEAN"
}, {
"col_name": "Beat",
"data_type": "BYTE_ARRAYI"
}, {
"col_name": "District",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Ward",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Community",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "FBI_Code",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "X_Coordinate",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Y_Coordinate",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Year",
"data_type": "INT32"
}, {
"col_name": "Updated_On",
"data_type": "BYTE_ARRAY"
}, {
"col_name": "Latitude",
"data_type": "DOUBLE"
}, {
"col_name": "Longitude",
"data_type": "DOUBLE"
}, {
"col_name": "Location",
"data_type": "BYTE_ARRAY"


}]
}


unsubscribe

2016-11-02 Thread Venkatesh Seshan
unsubscribe


Re: Spark ML - Is it rule of thumb that all Estimators should only be Fit on Training data

2016-11-02 Thread Sean Owen
I would also only fit these on training data. There are probably some
corner cases where letting these ancillary transforms see test data results
in a target leak. Though I can't really think of a good example.

More to the point, you're probably fitting these as part of a pipeline and
that pipeline as a whole is only fed with training data during model
building.

On Wed, Nov 2, 2016 at 6:05 PM Nirav Patel  wrote:

> It is very clear that for ML algorithms (classification, regression) that
> Estimator only fits on training data but it's not very clear of other
> estimators like IDF for example.
> IDF is a feature transformation model but having IDF estimator and
> transformer makes it little confusing that what exactly it does in Fitting
> on one dataset vs Transforming on another dataset.
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Spark ML - Is it rule of thumb that all Estimators should only be Fit on Training data

2016-11-02 Thread Nirav Patel
It is very clear that for ML algorithms (classification, regression) that
Estimator only fits on training data but it's not very clear of other
estimators like IDF for example.
IDF is a feature transformation model but having IDF estimator and
transformer makes it little confusing that what exactly it does in Fitting
on one dataset vs Transforming on another dataset.

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: How to return a case class in map function?

2016-11-02 Thread Michael Armbrust
Thats a bug.  Which version of Spark are you running?  Have you tried 2.0.2?

On Wed, Nov 2, 2016 at 12:01 AM, 颜发才(Yan Facai)  wrote:

> Hi, all.
> When I use a case class as return value in map function, spark always
> raise a ClassCastException.
>
> I write an demo, like:
>
> scala> case class Record(key: Int, value: String)
>
> scala> case class ID(key: Int)
>
> scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF
>
> scala> df.map{x => ID(x.getInt(0))}.show
>
> 16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0
> (TID 175)
> java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to
> $line1401.$read$$iw$$iw$ID
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
>
>
> Please tell me if I'm wrong.
> Thanks.
>
>


Re: error: Unable to find encoder for type stored in a Dataset. when trying to map through a DataFrame

2016-11-02 Thread Michael Armbrust
Spark doesn't know how to turn a Seq[Any] back into a row.  You would need
to create a case class or something where we can figure out the schema.
What are you trying to do?

If you don't care about specifics fields and you just want to serialize the
type you can use kryo:

implicit val anyEncoder = Encoders.kryo[Seq[Any]]

On Wed, Nov 2, 2016 at 9:57 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I have the following scenario:
>
> scala> val df = spark.sql("select * from danieltest3")
> df: org.apache.spark.sql.DataFrame = [iid: string, activity: string ... 34
> more fields]
>
> Now I'm trying to map through the rows I'm getting:
> scala> df.map(r=>r.toSeq)
> :32: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>df.map(r=>r.toSeq)
>
>
> What am I missing here ?
>
> Thank you,
> Daniel
>


Re: Spark ML - CrossValidation - How to get Evaluation metrics of best model

2016-11-02 Thread Nirav Patel
Thanks!

On Tue, Nov 1, 2016 at 6:30 AM, Sean Owen  wrote:

> CrossValidator splits the data into k sets, and then trains k times,
> holding out one subset for cross-validation each time. You are correct that
> you should actually withhold an additional test set, before you use
> CrossValidator, in order to get an unbiased estimate of the best model's
> performance.
>
> On Tue, Nov 1, 2016 at 12:10 PM Nirav Patel  wrote:
>
>> I am running classification model. with normal training-test split I can
>> check model accuracy and F1 score using MulticlassClassificationEvaluator.
>> How can I do this with CrossValidation approach?
>> Afaik, you Fit entire sample data in CrossValidator as you don't want to
>> leave out any observation from either testing or training. But by doing so
>> I don't have anymore unseen data on which I can run finalized model on. So
>> is there a way I can get Accuracy and F1 score of a best model resulted
>> from cross validation?
>> Or should I still split sample data in to training and test before
>> running cross validation against only training data? so later I can test it
>> against test data.
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Custom receiver for WebSocket in Spark not working

2016-11-02 Thread kant kodali
I don't see a store() call in your receive().

Search for store() in here http://spark.apache.org/
docs/latest/streaming-custom-receivers.html

On Wed, Nov 2, 2016 at 10:23 AM, Cassa L  wrote:

> Hi,
> I am using spark 1.6. I wrote a custom receiver to read from WebSocket.
> But when I start my spark job, it  connects to the WebSocket but  doesn't
> get any message. Same code, if I write as separate scala class, it works
> and prints messages from WebSocket. Is anything missing in my Spark Code?
> There are no errors in spark console.
>
> Here is my receiver -
>
> import org.apache.spark.Logging
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.streaming.receiver.Receiver
> import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}
>
> /**
>   * Custom receiver for WebSocket
>   */
> class WebSocketReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) 
> with Runnable with Logging {
>
>   private var webSocket: WebSocket = _
>
>   @transient
>   private var thread: Thread = _
>
>   override def onStart(): Unit = {
> thread = new Thread(this)
> thread.start()
>   }
>
>   override def onStop(): Unit = {
> setWebSocket(null)
> thread.interrupt()
>   }
>
>   override def run(): Unit = {
> println("Received ")
> receive()
>   }
>
>   private def receive(): Unit = {
>
>
> val connection = WebSocket().open("ws://localhost:3001")
> println("WebSocket  Connected ..." )
> println("Connected --- " + connection)
> setWebSocket(connection)
>
>connection.listener(new TextListener {
>
>  override def onMessage(message: String) {
>  System.out.println("Message in Spark client is --> " + 
> message)
>}
> })
>
>
> }
>
> private def setWebSocket(newWebSocket: WebSocket) = synchronized {
> if (webSocket != null) {
> webSocket.shutDown
> }
> webSocket = newWebSocket
> }
>
> }
>
>
> =
>
> Here is code for Spark job
>
>
> object WebSocketTestApp {
>
>   def main(args: Array[String]) {
> val conf = new SparkConf()
>   .setAppName("Test Web Socket")
>   .setMaster("local[20]")
>   .set("test", "")
> val ssc = new StreamingContext(conf, Seconds(5))
>
>
> val stream: ReceiverInputDStream[String] = ssc.receiverStream(new 
> WebSocketReceiver())
> stream.print()
>
> ssc.start()
> ssc.awaitTermination()
>   }
>
>
> ==
> }
>
>
> Thanks,
>
> LCassa
>
>


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-11-02 Thread Michael David Pedersen
Awesome, thank you Michael for the detailed example!

I'll look into whether I can use this approach for my use case. If so, I
could avoid the overhead of repeatedly registering a temp table for one-off
queries, instead registering the table once and relying on the injected
strategy. Don't know how much of an impact this overhead has in praxis
though.

Cheers,
Michael


Re: Load whole ALS MatrixFactorizationModel into memory

2016-11-02 Thread Sean Owen
You can cause the underlying RDDs in the model to be cached in memory. That
would be necessary but not sufficient to make it go fast; it should at
least get rid of a lot of I/O. I think making recommendations one at a time
is never going to scale to moderate load this way; one request means one
entire job to schedule with multiple tasks. Fine for the occasional query
or smallish data, but not a thousand queries per second. For that I think
you'd have to build some custom scoring infrastructure. At least, that's
what I did, so I would say that.


On Wed, Nov 2, 2016 at 4:54 PM Mikael Ståldal 
wrote:

> import org.apache.spark.mllib.recommendation.ALS
> import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
>
>
> I build a MatrixFactorizationModel with ALS.trainImplicit(), then I save
> it with its save method.
>
> Later, in an other process on another machine, I load the model with
> MatrixFactorizationModel.load(). Now I want to make a lot of
> recommendProducts() calls on the loaded model, and I want them to be quick,
> without any I/O. However, they are slow and seem to to I/O each time.
>
> Is there any way to force loading the whole model into memory (that step
> can take some time and do I/O) and then be able to do recommendProducts()
> on it multiple times, quickly without I/O?
>
> --
> [image: MagineTV]
>
> *Mikael Ståldal*
> Senior software developer
>
> *Magine TV*
> mikael.stal...@magine.com
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com
>
> Privileged and/or Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not
> copy or deliver this message to anyone. In such case,
> you should destroy this message and kindly notify the sender by reply
> email.
>


Custom receiver for WebSocket in Spark not working

2016-11-02 Thread Cassa L
Hi,
I am using spark 1.6. I wrote a custom receiver to read from WebSocket. But
when I start my spark job, it  connects to the WebSocket but  doesn't get
any message. Same code, if I write as separate scala class, it works and
prints messages from WebSocket. Is anything missing in my Spark Code? There
are no errors in spark console.

Here is my receiver -

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.jfarcand.wcs.{MessageListener, TextListener, WebSocket}

/**
  * Custom receiver for WebSocket
  */
class WebSocketReceiver extends
Receiver[String](StorageLevel.MEMORY_ONLY) with Runnable with Logging
{

  private var webSocket: WebSocket = _

  @transient
  private var thread: Thread = _

  override def onStart(): Unit = {
thread = new Thread(this)
thread.start()
  }

  override def onStop(): Unit = {
setWebSocket(null)
thread.interrupt()
  }

  override def run(): Unit = {
println("Received ")
receive()
  }

  private def receive(): Unit = {


val connection = WebSocket().open("ws://localhost:3001")
println("WebSocket  Connected ..." )
println("Connected --- " + connection)
setWebSocket(connection)

   connection.listener(new TextListener {

 override def onMessage(message: String) {
 System.out.println("Message in Spark client is --> " + message)
   }
})


}

private def setWebSocket(newWebSocket: WebSocket) = synchronized {
if (webSocket != null) {
webSocket.shutDown
}
webSocket = newWebSocket
}

}


=

Here is code for Spark job


object WebSocketTestApp {

  def main(args: Array[String]) {
val conf = new SparkConf()
  .setAppName("Test Web Socket")
  .setMaster("local[20]")
  .set("test", "")
val ssc = new StreamingContext(conf, Seconds(5))


val stream: ReceiverInputDStream[String] = ssc.receiverStream(new
WebSocketReceiver())
stream.print()

ssc.start()
ssc.awaitTermination()
  }


==
}


Thanks,

LCassa


error: Unable to find encoder for type stored in a Dataset. when trying to map through a DataFrame

2016-11-02 Thread Daniel Haviv
Hi,
I have the following scenario:

scala> val df = spark.sql("select * from danieltest3")
df: org.apache.spark.sql.DataFrame = [iid: string, activity: string ... 34
more fields]

Now I'm trying to map through the rows I'm getting:
scala> df.map(r=>r.toSeq)
:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   df.map(r=>r.toSeq)


What am I missing here ?

Thank you,
Daniel


Load whole ALS MatrixFactorizationModel into memory

2016-11-02 Thread Mikael Ståldal
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel


I build a MatrixFactorizationModel with ALS.trainImplicit(), then I save it
with its save method.

Later, in an other process on another machine, I load the model with
MatrixFactorizationModel.load(). Now I want to make a lot of
recommendProducts() calls on the loaded model, and I want them to be quick,
without any I/O. However, they are slow and seem to to I/O each time.

Is there any way to force loading the whole model into memory (that step
can take some time and do I/O) and then be able to do recommendProducts()
on it multiple times, quickly without I/O?

-- 
[image: MagineTV]

*Mikael Ståldal*
Senior software developer

*Magine TV*
mikael.stal...@magine.com
Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com

Privileged and/or Confidential Information may be contained in this
message. If you are not the addressee indicated in this message
(or responsible for delivery of the message to such a person), you may not
copy or deliver this message to anyone. In such case,
you should destroy this message and kindly notify the sender by reply
email.


Use a specific partition of dataframe

2016-11-02 Thread Yanwei Zhang
Is it possible to retrieve a specific partition  (e.g., the first partition) of 
a DataFrame and apply some function there? My data is too large, and I just 
want to get some approximate measures using the first few partitions in the 
data. I'll illustrate what I want to accomplish using the example below:

// create date
val tmp = sc.parallelize(Seq( ("a", 1), ("b", 2), ("a", 1),
  ("b", 2), ("a", 1), ("b", 2)), 2).toDF("var", 
"value")
// I want to get the first partition only, and do some calculation, for 
example, count by the value of "var"
tmp1 = tmp.getPartition(0)
tmp1.groupBy("var").count()

The idea is not to go through all the data to save computational time. So I am 
not sure whether mapPartitionsWithIndex is helpful in this case, since it still 
maps all data.

Regards,
Wayne




Re: How to avoid unnecessary spark starkups on every request?

2016-11-02 Thread Vadim Semenov
Take a look at https://github.com/spark-jobserver/spark-jobserver or
https://github.com/cloudera/livy

you can launch a persistent spark context and then submit your jobs using a
already running context

On Wed, Nov 2, 2016 at 3:34 AM, Fanjin Zeng 
wrote:

>  Hi,
>
>  I am working on a project that takes requests from HTTP server and
> computes accordingly on spark. But the problem is when I receive many
> request at the same time, users have to waste a lot of time on the
> unnecessary startups that occur on each request. Does Spark have built-in
> job scheduler function to solve this problem or is there any trick can be
> used to avoid these unnecessary startups?
>
>  Thanks,
>  Fanjin
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Unsubscribe

2016-11-02 Thread srikrishna chaitanya garimella
Unsubscribe


Re: Running Google Dataflow on Spark

2016-11-02 Thread Sean Owen
This is a Dataflow / Beam question, not a Spark question per se.

On Wed, Nov 2, 2016 at 11:48 AM Ashutosh Kumar 
wrote:

> I am trying to run Google Dataflow code on Spark. It works fine as google
> dataflow on google cloud platform. But while running on Spark I am getting
> following error
>
> 16/11/02 11:14:32 INFO com.cloudera.dataflow.spark.SparkPipelineRunner:
> Evaluating ParDo(GroupByKeyHashAndSortByKeyAndWindow)
> Exception in thread "main" java.lang.RuntimeException:
> java.lang.IllegalStateException: No TransformEvaluator registered for class
> com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$Grou
> pByKeyAndSortValuesOnly
> at
> com.cloudera.dataflow.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:124)
>
> Thanks
> Ashutosh
>


Running Google Dataflow on Spark

2016-11-02 Thread Ashutosh Kumar
I am trying to run Google Dataflow code on Spark. It works fine as google
dataflow on google cloud platform. But while running on Spark I am getting
following error

16/11/02 11:14:32 INFO com.cloudera.dataflow.spark.SparkPipelineRunner:
Evaluating ParDo(GroupByKeyHashAndSortByKeyAndWindow)
Exception in thread "main" java.lang.RuntimeException:
java.lang.IllegalStateException: No TransformEvaluator registered for class
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$Grou
pByKeyAndSortValuesOnly
at
com.cloudera.dataflow.spark.SparkPipelineRunner.run(SparkPipelineRunner.java:124)

Thanks
Ashutosh


Need to know about GraphX and Streaming

2016-11-02 Thread Md. Mahedi Kaysar
Hi All,
I am new in Spark GraphX. I am trying to understand it for analysing graph
streaming data. I know Spark has streaming modules that works on both
Tabular and DStream mechanism.
I am wondering if it is possible to leverage streaming APIs in GraphX for
analysing the real-time graph streams. What would be the overhead and
overall performance if I use it? I want to know the on going research of
graph streaming.

It would be very helpful if someone explain me about it. Thanks in advance.


Kind Regards,

Mahedi


unsubscribe

2016-11-02 Thread Kunal Gaikwad
unsubscribe

Regards,
Kunal Gaikwad


[Spark2] huge BloomFilters

2016-11-02 Thread ponkin
Hi,
I need to build huge BloomFilter with 150 millions or even more insertions
import org.apache.spark.util.sketch.BloomFilter
val bf = spark.read.avro("/hdfs/path").filter("some ==
1").stat.bloomFilter("id", 15000, 0.01)

if I use keys for serialization
implicit val bfEncoder = org.apache.spark.sql.Encoders.kryo[BloomFilter]
And then try to save this filter in hdfs
the size of this bloom filter is more than 1G.

Is there any way to compress BloomFilter?
Do anybody have an experience with such a huge bloom filters?

In general I need to check some condition in Spark-streaming application.
I was thinking to use BloomFilters for that.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark2-huge-BloomFilters-tp27991.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Add jar files on classpath when submitting tasks to Spark

2016-11-02 Thread Mich Talebzadeh
Well if you look at the spark-shell script this is what it says

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag manually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# *through spark.driver.extraClassPath is not automatically propagated.*

Whether this is relevant or not I am not sure


HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 2 November 2016 at 09:36, Jan Botorek  wrote:

> Thank you for the example.
>
> I am able to submit the task when using the –jars parameter as followed:
>
>
>
> *spark-submit --class com.infor.skyvault.tests.LinearRegressionTest
> --master local –jars path/to/jar/one;path/to/jar/two
> C:\_resources\spark-1.0-SNAPSHOT.jar -DtrainDataPath="/path/to/model/data"*
>
>
>
> But, I would like to find out, why the setting of
> *spark.driver.extraClassPath* attribute in spark-defaults.xml is not
> applied when submitting the task.
>
> In our scenario let’s assume that all workers (currently only one worker)
> have the attribute *spark.driver.extraClassPath* set to the same path and
> the folder on all workers contains the same .jar files.
>
>
>
> Thank you for your help,
>
>
>
> Regards,
>
> Jan
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Tuesday, November 1, 2016 3:22 PM
>
> *To:* Jan Botorek 
> *Cc:* Vinod Mangipudi ; user 
> *Subject:* Re: Add jar files on classpath when submitting tasks to Spark
>
>
>
> If you are using local mode then there is only one JVM. In Linux as below
> mine looks like this
>
>
>
> ${SPARK_HOME}/bin/spark-submit \
> --packages ${PACKAGES} \
> --driver-memory 8G \
> --num-executors 1 \
> --executor-memory 8G \
> *--master local[12] \*
> --conf "${SCHEDULER}" \
> --conf "${EXTRAJAVAOPTIONS}" \
> --jars ${JARS} \
> --class "${FILE_NAME}" \
> --conf "${SPARKUIPORT}" \
> --conf "${SPARKDRIVERPORT}" \
> --conf "${SPARKFILESERVERPORT}" \
> --conf "${SPARKBLOCKMANAGERPORT}" \
> --conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
> ${JAR_FILE}
>
>
>
> These parameters are defined below
>
> function default_settings {
> export PACKAGES="com.databricks:spark-csv_2.11:1.3.0"
> export SCHEDULER="spark.scheduler.mode=FAIR"
> export EXTRAJAVAOPTIONS="spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps"
> export JARS="/home/hduser/jars/spark-streaming-kafka-assembly_2.11-
> 1.6.1.jar"
> export SPARKUIPORT="spark.ui.port=5"
> export SPARKDRIVERPORT="spark.driver.port=54631"
> export SPARKFILESERVERPORT="spark.fileserver.port=54731"
> export SPARKBLOCKMANAGERPORT="spark.blockManager.port=54832"
> export SPARKKRYOSERIALIZERBUFFERMAX="spark.kryoserializer.buffer.max=512"
> }
>
>
>
> and other jar files have passed through --jars. Note that ${JAR_FILE} in
> my case is built through MVN or SBT
>
>
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
> On 1 November 2016 at 14:02, Jan Botorek  wrote:
>
> Yes, exactly.
> My (testing) run script is:
>
> spark-submit --class com.infor.skyvault.tests.LinearRegressionTest
> --master local C:\_resources\spark-1.0-SNAPSHOT.jar
> -DtrainDataPath="/path/to/model/data"
>
>
>
>
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Tuesday, November 1, 2016 2:51 PM
> *To:* Jan Botorek 
> *Cc:* Vinod Mangipudi ; user 
>
>
> *Subject:* Re: Add jar files on classpath when submitting tasks to Spark
>
>
>
> Are you submitting your job through spark-submit?
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 

RE: Add jar files on classpath when submitting tasks to Spark

2016-11-02 Thread Jan Botorek
Thank you for the example.
I am able to submit the task when using the –jars parameter as followed:

spark-submit --class com.infor.skyvault.tests.LinearRegressionTest --master 
local –jars path/to/jar/one;path/to/jar/two 
C:\_resources\spark-1.0-SNAPSHOT.jar -DtrainDataPath="/path/to/model/data"

But, I would like to find out, why the setting of spark.driver.extraClassPath 
attribute in spark-defaults.xml is not applied when submitting the task.
In our scenario let’s assume that all workers (currently only one worker) have 
the attribute spark.driver.extraClassPath set to the same path and the folder 
on all workers contains the same .jar files.

Thank you for your help,

Regards,
Jan

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Tuesday, November 1, 2016 3:22 PM
To: Jan Botorek 
Cc: Vinod Mangipudi ; user 
Subject: Re: Add jar files on classpath when submitting tasks to Spark

If you are using local mode then there is only one JVM. In Linux as below mine 
looks like this

${SPARK_HOME}/bin/spark-submit \
--packages ${PACKAGES} \
--driver-memory 8G \
--num-executors 1 \
--executor-memory 8G \
--master local[12] \
--conf "${SCHEDULER}" \
--conf "${EXTRAJAVAOPTIONS}" \
--jars ${JARS} \
--class "${FILE_NAME}" \
--conf "${SPARKUIPORT}" \
--conf "${SPARKDRIVERPORT}" \
--conf "${SPARKFILESERVERPORT}" \
--conf "${SPARKBLOCKMANAGERPORT}" \
--conf "${SPARKKRYOSERIALIZERBUFFERMAX}" \
${JAR_FILE}

These parameters are defined below

function default_settings {
export PACKAGES="com.databricks:spark-csv_2.11:1.3.0"
export SCHEDULER="spark.scheduler.mode=FAIR"
export EXTRAJAVAOPTIONS="spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps"
export JARS="/home/hduser/jars/spark-streaming-kafka-assembly_2.11-1.6.1.jar"
export SPARKUIPORT="spark.ui.port=5"
export SPARKDRIVERPORT="spark.driver.port=54631"
export SPARKFILESERVERPORT="spark.fileserver.port=54731"
export SPARKBLOCKMANAGERPORT="spark.blockManager.port=54832"
export SPARKKRYOSERIALIZERBUFFERMAX="spark.kryoserializer.buffer.max=512"
}

and other jar files have passed through --jars. Note that ${JAR_FILE} in my 
case is built through MVN or SBT

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 1 November 2016 at 14:02, Jan Botorek 
mailto:jan.boto...@infor.com>> wrote:
Yes, exactly.
My (testing) run script is:
spark-submit --class com.infor.skyvault.tests.LinearRegressionTest --master 
local C:\_resources\spark-1.0-SNAPSHOT.jar -DtrainDataPath="/path/to/model/data"



From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com]
Sent: Tuesday, November 1, 2016 2:51 PM
To: Jan Botorek mailto:jan.boto...@infor.com>>
Cc: Vinod Mangipudi mailto:vinod...@gmail.com>>; user 
mailto:user@spark.apache.org>>

Subject: Re: Add jar files on classpath when submitting tasks to Spark

Are you submitting your job through spark-submit?


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 1 November 2016 at 13:39, Jan Botorek 
mailto:jan.boto...@infor.com>> wrote:
Hello,
This approach unfortunately doesn’t work for job submission for me. It works in 
the shell, but not when submitted.
I ensured the (only worker) node has desired directory.

Neither specifying all jars as you suggested, neither using /path/to/jarfiles/* 
works.

Could you verify, that using this settings you are able to submit jobs with 
according dependencies, please?

From: Mich Talebzadeh 
[mailto:mich.talebza...@gmail.com]
Sent: Tuesday, November 1, 2016 2:18 PM
To: Vinod Mangipudi mailto:vinod...@gmail.com>>

Cc: user mailto:user@spark.apache.org>>
Subject: Re: Add jar files on classpath when submitting tasks to Spark

you can do that as long as every node has the directory referenced.

For example

spark.driver.extraClassPath  
/home/hduser/jars/ojdbc6.jar:/home/hduser/jars/jconn4.jar
spa

random idea

2016-11-02 Thread kant kodali
Hi Guys,

I have a random idea and it would be great to receive some input.

Can we have a HTTP2 Based receiver for Spark Streaming? I am wondering why
not build micro services using Spark when needed? I can see it is not meant
for that but I like to think it can be possible. To be more concrete, here
is a scenario.

It is a very common pattern in the industry that people build a service
layer on top of a database (say Cassandra, HBASE or whatever). So all the
service layer does is expose few service functions for the application via
HTTP2 or RPC or some other protocol and the service layer is built simply
by using the database driver underneath. After some point people will start
thinking how to scale this service layer so they start introducing
components service discovery, resource management and so on.  I feel like I
can get all this from Spark with added bonus such as leveraging the
locality using the connectors written for a distributed database. Now I
understand Spark Streaming may not be ready for this given its latency and
mini-batch model but at some point I am confident that this latency issue
will be addressed in which case I do see a path for this. what do you think?

Thanks!
kant


Re: How to avoid unnecessary spark starkups on every request?

2016-11-02 Thread vincent gromakowski
Hi
I am currently using akka http sending requests to multiple spark actors
that use a preloaded spark context and fair scheduler. It's only a
prototype and I haven't tested the concurrency but it seems one of the
rigth way to do. Complete processing time is arround 600 ms.The other way
would be to use a spark job server but i don't like to split my REST API in
2 (one business  in akka http and one technical in jobserver).

Le 2 nov. 2016 8:34 AM, "Fanjin Zeng"  a écrit :

>  Hi,
>
>  I am working on a project that takes requests from HTTP server and
> computes accordingly on spark. But the problem is when I receive many
> request at the same time, users have to waste a lot of time on the
> unnecessary startups that occur on each request. Does Spark have built-in
> job scheduler function to solve this problem or is there any trick can be
> used to avoid these unnecessary startups?
>
>  Thanks,
>  Fanjin
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to avoid unnecessary spark starkups on every request?

2016-11-02 Thread Fanjin Zeng
 Hi,
 
 I am working on a project that takes requests from HTTP server and computes 
accordingly on spark. But the problem is when I receive many request at the 
same time, users have to waste a lot of time on the unnecessary startups that 
occur on each request. Does Spark have built-in job scheduler function to solve 
this problem or is there any trick can be used to avoid these unnecessary 
startups?
 
 Thanks,
 Fanjin

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to return a case class in map function?

2016-11-02 Thread Yan Facai
Hi, all.
When I use a case class as return value in map function, spark always raise
a ClassCastException.

I write an demo, like:

scala> case class Record(key: Int, value: String)

scala> case class ID(key: Int)

scala> val df = Seq(Record(1, "a"), Record(2, "b")).toDF

scala> df.map{x => ID(x.getInt(0))}.show

16/11/02 14:52:34 ERROR Executor: Exception in task 0.0 in stage 166.0 (TID
175)
java.lang.ClassCastException: $line1401.$read$$iw$$iw$ID cannot be cast to
$line1401.$read$$iw$$iw$ID
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)


Please tell me if I'm wrong.
Thanks.