Bulk insert strategy
While processing DStream in the Spark Programming Guide, the suggested usage of connection is the following, dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) In this case processing and the insertion is done in the workers. There, we don’t use batch insert in db. How about this use case, where we can process(parse string JSON to obj) and send back those objects to master and then send a bulk insert request. Is there any benefit for sending individually using connection pool vs use of bulk operation in the master? A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources
Re: Connection pool in workers
Thanks Chris, That is what I wanted to know :) A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Mar 2, 2015, at 2:04 AM, Chris Fregly wrote: > hey AKM! > > this is a very common problem. the streaming programming guide addresses > this issue here, actually: > http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > the tl;dr is this: > 1) you want to use foreachPartition() to operate on a whole partition versus > a single record with foreachRDD() > 2) you want to get/release the ConnectionPool within each worker > 3) make sure you initialize the ConnectionPool first - or do it lazily upon > getting the first connection. > > here's the sample code referenced in the link above with some additional > comments for clarity: > > dstream.foreachRDD { rdd => > // everything within here runs on the Driver > > rdd.foreachPartition { partitionOfRecords => >// everything within here runs on the Worker and operates on a partition > of records > > // ConnectionPool is a static, lazily initialized singleton pool of > connections that runs within the Worker JVM > > // retrieve a connection from the pool > val connection = ConnectionPool.getConnection() > > // perform the application logic here - parse and write to mongodb using > the connection > partitionOfRecords.foreach(record => connection.send(record)) > > // return to the pool for future reuse > ConnectionPool.returnConnection(connection) > } > } > > hope that helps! > > -chris > > > > > On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman > wrote: > Sorry guys may bad, > Here is a high level code sample, > > val unionStreams = ssc.union(kinesisStreams) > unionStreams.foreachRDD(rdd => { > rdd.foreach(tweet => > val strTweet = new String(tweet, "UTF-8") > val interaction = InteractionParser.parser(strTweet) > interactionDAL.insert(interaction) > ) > }) > > Here I have to close the connection for interactionDAL other wise the JVM > gives me error that the connection is open. I tried with sticky connection as > well with keep_alive true. So my guess was that at the point of > “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send > to workers and workers un-marshals and execute the process, which is why the > connection is alway opened for each RDD. I might be completely wrong. I would > love to know what is going on underneath. > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Connection pool in workers
Sorry guys may bad, Here is a high level code sample, val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD(rdd => { rdd.foreach(tweet => val strTweet = new String(tweet, "UTF-8") val interaction = InteractionParser.parser(strTweet) interactionDAL.insert(interaction) ) }) Here I have to close the connection for interactionDAL other wise the JVM gives me error that the connection is open. I tried with sticky connection as well with keep_alive true. So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to workers and workers un-marshals and execute the process, which is why the connection is alway opened for each RDD. I might be completely wrong. I would love to know what is going on underneath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Getting OutOfMemoryError and Worker.run caught exception
(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) 2014-12-18 01:52:19,171 [ Worker.java:Executor task launch worker-0:353] - Worker.run caught exception, sleeping for 1000 milli seconds! java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:351) at org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:131) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Have no clue how to resolve the issue. I will do a memory leak test. But this is a simple and small application. I don’t see a leak there with naked eyes. Can any one help me with how I should investigate? A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources
Re: Having problem with Spark streaming with Kinesis
Thanks Aniket, The trick is to have the #workers >= #shards + 1. But I don’t know why is that. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html Here in the figure[spark streaming kinesis architecture], it seems like one node should be able to take on more than one shards. A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman wrote: > Hi guys, > When we are using Kinesis with 1 shard then it works fine. But when we use > more that 1 then it falls into an infinite loop and no data is processed by > the spark streaming. In the kinesis dynamo DB, I can see that it keeps > increasing the leaseCounter. But it do start processing. > > I am using, > scala: 2.10.4 > java version: 1.8.0_25 > Spark: 1.1.0 > spark-streaming-kinesis-asl: 1.1.0 > > A.K.M. Ashrafuzzaman > Lead Software Engineer > NewsCred > > (M) 880-175-5592433 > Twitter | Blog | Facebook > > Check out The Academy, your #1 source > for free content marketing resources >
Re: Having problem with Spark streaming with Kinesis
Guys, In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 it does not consume from the stream. Later we found that the EC2 machine was of 2 cores and my local machine was of 4 cores. I am using a single machine and in spark standalone mode. And we got a larger machine from EC2 and now the kinesis is getting consumed. 4 cores Single machine -> works 2 cores Single machine -> does not work 2 cores 2 workers -> does not work So my question is that do we need a cluster of (#KinesisShards + 1) workers to be able to consume from Kinesis? A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar wrote: > Did you set spark master as local[*]? If so, then it means that nunber of > executors is equal to number of cores of the machine. Perhaps your mac > machine has more cores (certainly more than number of kinesis shards +1). > > Try explicitly setting master as local[N] where N is number of kinesis shards > + 1. It should then work on both the machines. > > On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman > wrote: > I was trying in one machine with just sbt run. > > And it is working with my mac environment with the same configuration. > > I used the sample code from > https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala > > > val kinesisClient = new AmazonKinesisClient(new > DefaultAWSCredentialsProviderChain()) > kinesisClient.setEndpoint(endpointUrl) > val numShards = > kinesisClient.describeStream(streamName).getStreamDescription().getShards() > .size() > > /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream > for each shard. */ > val numStreams = numShards > > /* Setup the and SparkConfig and StreamingContext */ > /* Spark Streaming batch interval */ > val batchInterval = Milliseconds(2000) > val sparkConfig = new SparkConf().setAppName("KinesisWordCount") > val ssc = new StreamingContext(sparkConfig, batchInterval) > > /* Kinesis checkpoint interval. Same as batchInterval for this example. */ > val kinesisCheckpointInterval = batchInterval > > /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's > shards */ > val kinesisStreams = (0 until numStreams).map { i => > KinesisUtils.createStream(ssc, streamName, endpointUrl, > kinesisCheckpointInterval, > InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) > } > > /* Union all the streams */ > val unionStreams = ssc.union(kinesisStreams) > > /* Convert each line of Array[Byte] to String, split into words, and count > them */ > val words = unionStreams.flatMap(byteArray => new String(byteArray) > .split(" ")) > > /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ > val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) > > /* Print the first 10 wordCounts */ > wordCounts.print() > > /* Start the streaming context and await termination */ > ssc.start() > ssc.awaitTermination() > > > > A.K.M. Ashrafuzzaman > Lead Software Engineer > NewsCred > > (M) 880-175-5592433 > Twitter | Blog | Facebook > > Check out The Academy, your #1 source > for free content marketing resources > > On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar > wrote: > What's your cluster size? For streamig to work, it needs shards + 1 executors. > > On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman > wrote: > Hi guys, > When we are using Kinesis with 1 shard then it works fine. But when we use > more that 1 then it falls into an infinite loop and no data is processed by > the spark streaming. In the kinesis dynamo DB, I can see that it keeps > increasing the leaseCounter. But it do start processing. > > I am using, > scala: 2.10.4 > java version: 1.8.0_25 > Spark: 1.1.0 > spark-streaming-kinesis-asl: 1.1.0 > > A.K.M. Ashrafuzzaman > Lead Software Engineer > NewsCred > > (M) 880-175-5592433 > Twitter | Blog | Facebook > > Check out The Academy, your #1 source > for free content marketing resources > >
Having problem with Spark streaming with Kinesis
Hi guys, When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing. I am using, scala: 2.10.4 java version: 1.8.0_25 Spark: 1.1.0 spark-streaming-kinesis-asl: 1.1.0 A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources