Bulk insert strategy

2015-03-07 Thread A.K.M. Ashrafuzzaman
While processing DStream in the Spark Programming Guide, the suggested usage of 
connection is the following,

dstream.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
  // ConnectionPool is a static, lazily initialized pool of connections
  val connection = ConnectionPool.getConnection()
  partitionOfRecords.foreach(record => connection.send(record))
  ConnectionPool.returnConnection(connection)  // return to the pool 
for future reuse
  })
  })

In this case processing and the insertion is done in the workers. There, we 
don’t use batch insert in db. How about this use case, where we can 
process(parse string JSON to obj) and send back those objects to master and 
then send a bulk insert request. Is there any benefit for sending individually 
using connection pool vs use of bulk operation in the master?
    
A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources



Re: Connection pool in workers

2015-03-02 Thread A.K.M. Ashrafuzzaman
Thanks Chris,
That is what I wanted to know :)

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Mar 2, 2015, at 2:04 AM, Chris Fregly  wrote:

> hey AKM!
> 
> this is a very common problem.  the streaming programming guide addresses 
> this issue here, actually:  
> http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
> 
> the tl;dr is this:
> 1) you want to use foreachPartition() to operate on a whole partition versus 
> a single record with foreachRDD()
> 2) you want to get/release the ConnectionPool within each worker
> 3) make sure you initialize the ConnectionPool first - or do it lazily upon 
> getting the first connection.
> 
> here's the sample code referenced in the link above with some additional 
> comments for clarity:
> 
> dstream.foreachRDD { rdd =>
>   // everything within here runs on the Driver
> 
>   rdd.foreachPartition { partitionOfRecords =>
>// everything within here runs on the Worker and operates on a partition 
> of records
> 
> // ConnectionPool is a static, lazily initialized singleton pool of 
> connections that runs within the Worker JVM 
> 
> // retrieve a connection from the pool
> val connection = ConnectionPool.getConnection()
> 
> // perform the application logic here - parse and write to mongodb using 
> the connection
> partitionOfRecords.foreach(record => connection.send(record))
> 
> // return to the pool for future reuse
> ConnectionPool.returnConnection(connection)
>   }
> }
> 
> hope that helps!
> 
> -chris
> 
> 
> 
> 
> On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman 
>  wrote:
> Sorry guys may bad,
> Here is a high level code sample,
> 
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
> val strTweet = new String(tweet, "UTF-8")
> val interaction = InteractionParser.parser(strTweet)
> interactionDAL.insert(interaction)
>   )
> })
> 
> Here I have to close the connection for interactionDAL other wise the JVM 
> gives me error that the connection is open. I tried with sticky connection as 
> well with keep_alive true. So my guess was that at the point of 
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send 
> to workers and workers un-marshals and execute the process, which is why the 
> connection is alway opened for each RDD. I might be completely wrong. I would 
> love to know what is going on underneath.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 



Re: Connection pool in workers

2015-03-01 Thread A.K.M. Ashrafuzzaman
Sorry guys may bad,
Here is a high level code sample,

val unionStreams = ssc.union(kinesisStreams)
unionStreams.foreachRDD(rdd => {
  rdd.foreach(tweet =>
val strTweet = new String(tweet, "UTF-8")
val interaction = InteractionParser.parser(strTweet)
interactionDAL.insert(interaction)
  )
})

Here I have to close the connection for interactionDAL other wise the JVM gives 
me error that the connection is open. I tried with sticky connection as well 
with keep_alive true. So my guess was that at the point of 
“unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to 
workers and workers un-marshals and execute the process, which is why the 
connection is alway opened for each RDD. I might be completely wrong. I would 
love to know what is going on underneath.
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Getting OutOfMemoryError and Worker.run caught exception

2014-12-17 Thread A.K.M. Ashrafuzzaman
(LiveListenerBus.scala:47)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at 
org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46)
2014-12-18 01:52:19,171 [ Worker.java:Executor task launch 
worker-0:353] - Worker.run caught exception, sleeping for 1000 milli seconds!
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:351)
at 
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:131)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at 
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:264)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at 
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Have no clue how to resolve the issue. I will do a memory leak test. But this 
is a simple and small application. I don’t see a leak there with naked eyes.
Can any one help me with how I should investigate?

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources



Re: Having problem with Spark streaming with Kinesis

2014-12-13 Thread A.K.M. Ashrafuzzaman
Thanks Aniket,
The trick is to have the #workers >= #shards + 1. But I don’t know why is that.
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html

Here in the figure[spark streaming kinesis architecture], it seems like one 
node should be able to take on more than one shards.


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman  
wrote:

> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use 
> more that 1 then it falls into an infinite loop and no data is processed by 
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps 
> increasing the leaseCounter. But it do start processing.
> 
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 



Re: Having problem with Spark streaming with Kinesis

2014-12-03 Thread A.K.M. Ashrafuzzaman
Guys,
In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 
it does not consume from the stream. Later we found that the EC2 machine was of 
2 cores and my local machine was of 4 cores. I am using a single machine and in 
spark standalone mode. And we got a larger machine from EC2 and now the kinesis 
is getting consumed.

4 cores Single machine -> works
2 cores Single machine -> does not work
2 cores 2 workers -> does not work

So my question is that do we need a cluster of (#KinesisShards + 1) workers to 
be able to consume from Kinesis?


A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar  
wrote:

> Did you set spark master as local[*]? If so, then it means that nunber of 
> executors is equal to number of cores of the machine. Perhaps your mac 
> machine has more cores (certainly more than number of kinesis shards +1).
> 
> Try explicitly setting master as local[N] where N is number of kinesis shards 
> + 1. It should then work on both the machines.
> 
> On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman  
> wrote:
> I was trying in one machine with just sbt run.
> 
> And it is working with my mac environment with the same configuration.
> 
> I used the sample code from 
> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
> 
> 
> val kinesisClient = new AmazonKinesisClient(new 
> DefaultAWSCredentialsProviderChain())
> kinesisClient.setEndpoint(endpointUrl)
> val numShards = 
> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>   .size()
> 
> /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream 
> for each shard. */
> val numStreams = numShards
> 
> /* Setup the and SparkConfig and StreamingContext */
> /* Spark Streaming batch interval */
> val batchInterval = Milliseconds(2000)
> val sparkConfig = new SparkConf().setAppName("KinesisWordCount")
> val ssc = new StreamingContext(sparkConfig, batchInterval)
> 
> /* Kinesis checkpoint interval.  Same as batchInterval for this example. */
> val kinesisCheckpointInterval = batchInterval
> 
> /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's 
> shards */
> val kinesisStreams = (0 until numStreams).map { i =>
>   KinesisUtils.createStream(ssc, streamName, endpointUrl, 
> kinesisCheckpointInterval,
>   InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
> }
> 
> /* Union all the streams */
> val unionStreams = ssc.union(kinesisStreams)
> 
> /* Convert each line of Array[Byte] to String, split into words, and count 
> them */
> val words = unionStreams.flatMap(byteArray => new String(byteArray)
>   .split(" "))
> 
> /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */
> val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
> 
> /* Print the first 10 wordCounts */
> wordCounts.print()
> 
> /* Start the streaming context and await termination */
> ssc.start()
> ssc.awaitTermination()
> 
> 
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar 
>  wrote:
> What's your cluster size? For streamig to work, it needs shards + 1 executors.
> 
> On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman 
>  wrote:
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use 
> more that 1 then it falls into an infinite loop and no data is processed by 
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps 
> increasing the leaseCounter. But it do start processing.
> 
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
> 
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred
> 
> (M) 880-175-5592433
> Twitter | Blog | Facebook
> 
> Check out The Academy, your #1 source
> for free content marketing resources
> 
> 



Having problem with Spark streaming with Kinesis

2014-11-26 Thread A.K.M. Ashrafuzzaman
Hi guys,
When we are using Kinesis with 1 shard then it works fine. But when we use more 
that 1 then it falls into an infinite loop and no data is processed by the 
spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing 
the leaseCounter. But it do start processing.

I am using,
scala: 2.10.4
java version: 1.8.0_25
Spark: 1.1.0
spark-streaming-kinesis-asl: 1.1.0

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources