Re: [Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-25 Thread Yuval.Itzchakov
We're experiencing the exact same issue while running load tests on Spark
2.3.1 with Structured Streaming and `mapGroupsWithState`.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Does mapWithState need checkpointing to be specified in Spark Streaming?

2017-07-16 Thread Yuval.Itzchakov
Yes, you do.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-mapWithState-need-checkpointing-to-be-specified-in-Spark-Streaming-tp28858p28862.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-07-03 Thread Yuval.Itzchakov
Using a long period betweem checkpoints may cause a long linage of the graphs
computations to be created, since Spark uses checkpointing to cut it, which
can also cause a delay in the streaming job.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming UI similar to Spark Streaming

2017-07-02 Thread Yuval.Itzchakov
Hi,

Today, Spark Streaming exposes an extensive, detailed graphs of input rate,
processing time and delay. I was wondering, is there any plan to integrate
such a graph for Structured Streaming? Now with Kafka support and
implementation of stateful aggregations in Spark 2.2, it's becoming a very
attractive alternative to the current mapWithState.

Are there any roadmaps to integrate similar graphs?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-UI-similar-to-Spark-Streaming-tp28816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-07-02 Thread Yuval.Itzchakov
You can't. Spark doesn't let you fiddle with the data being checkpoint, as
it's an internal implementation detail.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Stateful aggregations with Structured Streaming

2016-11-19 Thread Yuval.Itzchakov
I've been using `DStream.mapWithState` and was looking forward to trying out
Structured Streaming. The thing I can't under is, does Structured Streaming
in it's current state support stateful aggregations?

Looking at the StateStore design document
(https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7),
and then doing a bit of digging around in the Spark codebase, I've seen
`mapPartitionsWithStateStore` as the only viable way of doing something with
a store, but the API requires an `UnsafeRow` for key and value which makes
we question if this is a real public API one should be using?

Does anyone know what the state of things are currently in regards to an
equivalent to `mapWithState` in Structured Streaming?

Thanks,
Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-aggregations-with-Structured-Streaming-tp28108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Structured Streaming in Spark 2.0 and DStreams

2016-05-15 Thread Yuval.Itzchakov
I've been reading/watching videos about the upcoming Spark 2.0 release which
brings us Structured Streaming. One thing I've yet to understand is how this
relates to the current state of working with Streaming in Spark with the
DStream abstraction.

All examples I can find, in the Spark repository/different videos is someone
streaming local JSON files or reading from HDFS/S3/SQL. Also, when browsing
the source, SparkSession seems to be defined inside org.apache.spark.sql, so
this gives me a hunch that this is somehow all related to SQL and the likes,
and not really to DStreams.

What I'm failing to understand is: Will this feature impact how we do
Streaming today? Will I be able to consume a Kafka source in a streaming
fashion (like we do today when we open a stream using KafkaUtils)? Will we
be able to do state-full operations on a Dataset[T] like we do today using
MapWithStateRDD? Or will there be a subset of operations that the catalyst
optimizer can understand such as aggregate and such?

I'd be happy anyone could shed some light on this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Structured-Streaming-in-Spark-2-0-and-DStreams-tp26959.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Evicting a lower version of a library loaded in Spark Worker

2016-04-03 Thread Yuval.Itzchakov
My code uses "com.typesafe.config" in order to read configuration values.
Currently, our code uses v1.3.0, whereas Spark uses 1.2.1 internally.

When I initiate a job, the worker process invokes a method in my code but
fails, because it's defined abstract in v1.2.1 whereas in v1.3.0 it is not.
The exception message is:

java.lang.IllegalAccessError: tried to access class
com.typesafe.config.impl.SimpleConfig from class
com.typesafe.config.impl.ConfigBeanImpl
at
com.typesafe.config.impl.ConfigBeanImpl.createInternal(ConfigBeanImpl.java:40)

My spark-submit command is follows:

spark-submit \
--driver-class-path
config-1.3.0.jar:hadoop-aws-2.7.1.jar:aws-java-sdk-1.10.62.jar" \
--driver-java-options "-Dconfig.file=/classes/application.conf
-Dlog4j.configurationFile=/classes/log4j2.xml -XX:+UseG1GC
-XX:+UseStringDeduplication" \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.executor.memory=5g" \
--conf "spark.driver.memory=5g" \
--conf
"spark.executor.extraClassPath=./config-1.3.0.jar:./hadoop-aws-2.7.1.jar:./aws-java-sdk-1.10.62.jar"
\
--conf "spark.executor.extraJavaOptionsFile=-Dconfig.file=./application.conf
-Dlog4j.configurationFile=./classes/log4j2.xml -XX:+UseG1GC
-XX:+UseStringDeduplication" \
--class SparkRunner spark-job-0.1.1.jar

This still fails, regardless of the spark worker loading. This is what I see
on the worker node:

Fetching http://***/jars/config-1.3.0.jar to
/tmp/spark-cac4dfb9-bf59-49bc-ab81-e24923051c86/executor-1104e522-3fa5-4eff-8e0c-43b3c3b24c65/spark-1ce3b6bd-92f2-4652-993e-4f7054d07d21/fetchFileTemp3242938794803852920.tmp
16/04/03 13:57:20 INFO util.Utils: Copying
/tmp/spark-cac4dfb9-bf59-49bc-ab81-e24923051c86/executor-1104e522-3fa5-4eff-8e0c-43b3c3b24c65/spark-1ce3b6bd-92f2-4652-993e-4f7054d07d21/-13916990391459691836506_cache
to /var/run/spark/work/app-20160403135716-0040/1/./config-1.3.0.jar
16/04/03 13:57:20 INFO executor.Executor: Adding
file:/var/run/spark/work/app-20160403135716-0040/1/./config-1.3.0.jar to
class loader

I have tried setting "spark.executor.userClassPathFirst" to true, but then
it blows up with an error saying a different SLF4J library was already
loaded, and crashes the worker process.

Has anyone had anything similar he had to achieve?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Evicting-a-lower-version-of-a-library-loaded-in-Spark-Worker-tp26664.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: value saveToCassandra is not a member of org.apache.spark.streaming.dstream.DStream[(String, Int)]

2016-04-03 Thread Yuval.Itzchakov
You need to import com.datastax.spark.connector.streaming._ to have the
methods available.
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/value-saveToCassandra-is-not-a-member-of-org-apache-spark-streaming-dstream-DStream-String-Int-tp26655p26663.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Setting up log4j2/logback with Spark 1.6.0

2016-03-19 Thread Yuval.Itzchakov
I've been trying to get log4j2 and logback to get to play nice with Spark
1.6.0 so I can properly offload my logs to a remote server.

I've attempted the following things:

1. Setting logback/log4j2 on the class path for both the driver and worker
nodes
2. Passing -Dlog4j.configurationFile= and -Dlogback.configuration= flags to
-extraJavaOptions

log4j2 used to work on Spark 1.5.2. After we've upgraded, the default
logging framework defers to log4j 1.2. Even when I get Spark to work with
logback, it doesn't find my logback.xml file located in
%APP_DIRECTORY%/classes/logback.xml. 

I'm always seeing Spark defer to this:

"Reading configuration from URL
jar:file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1.jar!/org/apache/spark/log4j-defaults.properties"

Has anyone had similar issues with this?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-up-log4j2-logback-with-Spark-1-6-0-tp26518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using dynamic allocation and shuffle service in Standalone Mode

2016-03-08 Thread Yuval.Itzchakov
Hi, 
I'm using Spark 1.6.0, and according to the documentation, dynamic
allocation and spark shuffle service should be enabled.

When I submit a spark job via the following:

spark-submit \
--master  \
--deploy-mode cluster \
--executor-cores 3 \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=2" \
--conf "spark.dynamicAllocation.maxExecutors=24" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.executor.memory=8g" \
--conf "spark.driver.memory=10g" \
--class SparkJobRunner
/opt/clicktale/entityCreator/com.clicktale.ai.entity-creator-assembly-0.0.2.jar

I'm seeing error logs from the workers being unable to connect to the
shuffle service:

16/03/08 17:33:15 ERROR storage.BlockManager: Failed to connect to external
shuffle server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to 
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:211)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:208)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I verified all relevant ports are open. Has anyone else experienced such a
failure?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-dynamic-allocation-and-shuffle-service-in-Standalone-Mode-tp26430.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming fileStream vs textFileStream

2016-03-06 Thread Yuval.Itzchakov
I dont think the documentation can be anymore descriptive:

  /**
   * Create a input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them using the given key-value types and input
format.
   * Files must be written to the monitored directory by "moving" them from
another
   * location within the same file system. File names starting with . are
ignored.
   * @param directory HDFS directory to monitor for new file
   * @tparam K Key type for reading HDFS file
   * @tparam V Value type for reading HDFS file
   * @tparam F Input format for reading HDFS file
   */



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fileStream-vs-textFileStream-tp26407p26410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Continuous deployment to Spark Streaming application with sessionization

2016-03-06 Thread Yuval.Itzchakov
I've been recently thinking about continuous deployment to our spark
streaming service.

We have a streaming application which does sessionization via
`mapWithState`, aggregating sessions in memory until they are ready to be
deployed.

Now, as I see things we have two use cases here:

1. Spark streaming DAG didn't change - In this particular case, there
shouldn't be a problem as the spark DAG is checkpointed every X seconds, so
we should have most of our state saved and loaded.

2. Streaming streaming DAG changed - As far as I understand, if the spark
DAG changes between releases, checkpointed data cannot be read and
initialized again. This means that we'd actually lose all the state that was
saved up until we terminated our job.

Has anyone thought about this scenario? How do you guys deal with this in
production?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuous-deployment-to-Spark-Streaming-application-with-sessionization-tp26409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using a non serializable third party JSON serializable on a spark worker node throws NotSerializableException

2016-03-01 Thread Yuval.Itzchakov
I have a small snippet of code which relays on  argonaut
   for JSON serialization which is ran from a
`PairRDDFunctions.mapWithState` once a session is completed.

This is the code snippet (not that important):

  override def sendMessage(pageView: PageView): Unit = {
Future {
  LogHolder.logger.info(s"Sending pageview: ${pageView.id} to
automation")
  try {
Http(url)
  .postData(pageView.asJson.toString)
  .option(HttpOptions.connTimeout(timeOutMilliseconds))
  .asString
  .throwError
  }
  catch {
case NonFatal(e) => LogHolder.logger.error("Failed to send
pageview", e)
  }
}
  }

argonaut relys on a user implementation of a trait called `EncodeJson[T]`,
which tells argonaut how to serialize and deserialize the object.

The problem is, that the trait `EncodeJson[T]` is not serializable, thus
throwing a NotSerializableException:

Caused by: java.io.NotSerializableException: argonaut.EncodeJson$$anon$2
Serialization stack:
- object not serializable (class: argonaut.EncodeJson$$anon$2,
value: argonaut.EncodeJson$$anon$2@6415f61e)

This is obvious and understandable.

The question I have is - What possible ways are there to work around this?
I'm currently depended on a third-party library which I can't control of
change to implement Serializable in anyway. I've seen this  this
StackOverflow answer

  
but couldn't implement any reasonable workaround.

Anyone have any ideas?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-a-non-serializable-third-party-JSON-serializable-on-a-spark-worker-node-throws-NotSerializablen-tp26372.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark jobs run extremely slow on yarn cluster compared to standalone spark

2016-02-14 Thread Yuval.Itzchakov
Your question lacks sufficient information for us to actually provide help.
Have you looked at the Spark UI to see which part of the graph is taking the
longest? Have you tried logging your methods?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-jobs-run-extremely-slow-on-yarn-cluster-compared-to-standalone-spark-tp26215p26221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how can i write map(x => x._1 ++ x._2) statement in python.??

2016-02-08 Thread Yuval.Itzchakov
In python, concatenating two lists can be done simply using the + operator.
I'm assuming the RDD you're using map over consists of a tuple:

map(lambda x: x[0] + x[1])




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-can-i-write-map-x-x-1-x-2-statement-in-python-tp26172p26173.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Yuval.Itzchakov
I would definitely try to avoid hosting Kafka and Spark on the same servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Yuval.Itzchakov
Hi,
I've been playing with the expiramental PairDStreamFunctions.mapWithState
feature and I've seem to have stumbled across a bug, and was wondering if
anyone else has been seeing this behavior.

I've opened up an issue in the Spark JIRA, I simply want to pass this along
in case anyone else is experiencing such a failure or perhaps someone has
insightful information if this is actually a bug:  SPARK-13195
  

Using the new spark mapWithState API, I've encountered a bug when setting a
timeout for mapWithState but no explicit state handling.

h1. Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to
not update any state inside it using *state.update*. Simply create a "pass
through" method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets
a timeout using *StateSpec.timeout* method.
3. Create a DStream pipeline that uses mapWithState with the given
StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing
the following exception:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
at org.apache.spark.streaming.StateImpl.get(State.scala:150)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

h1. Sample code to reproduce the issue:

{code:Title=MainObject}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {

val sc = new SparkConf().setAppName("mapWithState bug reproduce")
val sparkContext = new SparkContext(sc)

val ssc = new StreamingContext(sparkContext, Seconds(4))
val stateSpec = StateSpec.function(trackStateFunc
_).timeout(Seconds(60))

// Create a stream that generates 1000 lines per second
val stream = ssc.receiverStream(new DummySource(10))

// Split the lines into words, and create a paired (key-value) dstream
val wordStream = stream.flatMap {
  _.split(" ")
}.map(word => (word, 1))

// This represents the emitted stream from the trackStateFunc. Since we
emit every input record with the updated value,
// this stream will contain the same # of records as the input dstream.
val wordCountStateStream = wordStream.mapWithState(stateSpec)
wordCountStateStream.print()

ssc.remember(Minutes(1)) // To make sure data is not deleted by the time
we query it interactively

// Don't forget to set checkpoint directory
ssc.checkpoint("")
ssc.start()
ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
Some(output)
  }
}
{code}

{code:Title=DummySource}

/**
  * Created by yuvali on 04/02/2016.
  */

import org.apache.spark.storage.StorageLevel
import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
// Start the thread that receives data over a connection
new Thread("Dummy Source") {
  override def run() { receive() }
}.start()
  }

  def onStop() {
// There is nothing much to do