Thanks  Cody for  trying to  understand the  issue .
Sorry if  I am  not  clear .
The scenario  is  to  process all messages at once  in  single  dstream block  
when  source  system  publishes messages .Source  system  will  publish x 
messages  / 10 minutes  once.

By events I meant that  total no of messages processed by each batch interval  
( in my case 2000ms)   by executor ( web UI shows each block  processing  as  
events)  

DirectStream is processing only  10 messages per batch. It  is  same if  100 or 
 1 million  messages published. 

xyz topic having 20 partitions. 
I am  using  kafka producer api to publish messages. 
Below  is  the  code  that  I am  using 
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran  = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}


 }

I tried using DirectStream with map&partition which  I had  issue with  
offsetRange . After  your  suggestion  offset  issue  is  resolved  when  I 
used  above  DirectStream code with topic only.

spark-submit setting that  I am  using  is  in  the  mail  chain  below .
Is  there  any bottlebeck I am  hitting to  process maximum messages at one 
batch interval using  directsream rdd?  .
If  this  is  not  clear . I would  take  this  offline  and  explain   
scenario briefly. 

Sent from Samsung Mobile.







<div>-------- Original message --------</div><div>From: Cody Koeninger 
<c...@koeninger.org> </div><div>Date:06/02/2016  22:32  (GMT+05:30) 
</div><div>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> </div><div>Cc: 
user@spark.apache.org </div><div>Subject: Re: Kafka directsream receiving rate 
</div><div>
</div>I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages under 
each  dstream block."  If it is processing all messages, what is the problem 
you are having?

"The issue is  with  Directsream processing 10 message per event. "  What 
distinction are you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first 
email said you were publishing 100 messages but only processing 10.  Why are 
you now trying to process 1 million messages without understanding what is 
going on?  Make sure you can process a limited number of messages correctly 
first.  The first code examples you posted to the list had some pretty serious 
errors (ie only trying to process 1 partition, trying to process offsets that 
didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of 
messages per partition, you shouldn't be hitting some kind of limit with 1 
million messages per batch.  You may of course hit executor resource issues 
depending on what you're trying to do with each message, but that doesn't sound 
like the case here.

If you want help, either clarify what you are saying, or post a minimal 
reproducible code example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream 
is  better than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions 
per batch .




Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <c...@koeninger.org>
Date:06/02/2016 01:30 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

Have you tried just printing each message, to see which ones are being 
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <c...@koeninger.org>
Date:06/02/2016 00:33 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, 
just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=1000000" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


-------- Original message --------
From: Cody Koeninger <c...@koeninger.org>
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 
1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


-------- Original message --------
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org 
Cc:
Subject: Kafka directsream  receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.




Reply via email to