Thanks Cody for trying to understand the issue .
Sorry if I am not clear .
The scenario is to process all messages at once in single dstream block
when source system publishes messages .Source system will publish x
messages / 10 minutes once.
By events I meant that total no of messages processed by each batch interval
( in my case 2000ms) by executor ( web UI shows each block processing as
events)
DirectStream is processing only 10 messages per batch. It is same if 100 or
1 million messages published.
xyz topic having 20 partitions.
I am using kafka producer api to publish messages.
Below is the code that I am using
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" ->
"datanode4.isdp.com:9092")
val k =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
kafkaParams, topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}
}
I tried using DirectStream with map&partition which I had issue with
offsetRange . After your suggestion offset issue is resolved when I
used above DirectStream code with topic only.
spark-submit setting that I am using is in the mail chain below .
Is there any bottlebeck I am hitting to process maximum messages at one
batch interval using directsream rdd? .
If this is not clear . I would take this offline and explain
scenario briefly.
Sent from Samsung Mobile.
<div>-------- Original message --------</div><div>From: Cody Koeninger
<[email protected]> </div><div>Date:06/02/2016 22:32 (GMT+05:30)
</div><div>To: Diwakar Dhanuskodi <[email protected]> </div><div>Cc:
[email protected] </div><div>Subject: Re: Kafka directsream receiving rate
</div><div>
</div>I am not at all clear on what you are saying.
"Yes , I am printing each messages . It is processing all messages under
each dstream block." If it is processing all messages, what is the problem
you are having?
"The issue is with Directsream processing 10 message per event. " What
distinction are you making between a message and an event?
"I am expecting Directsream to process 1 million messages" Your first
email said you were publishing 100 messages but only processing 10. Why are
you now trying to process 1 million messages without understanding what is
going on? Make sure you can process a limited number of messages correctly
first. The first code examples you posted to the list had some pretty serious
errors (ie only trying to process 1 partition, trying to process offsets that
didn't exist). Make sure that is all fixed first.
To be clear, I use direct kakfa rdds to process batches with like 4gb of
messages per partition, you shouldn't be hitting some kind of limit with 1
million messages per batch. You may of course hit executor resource issues
depending on what you're trying to do with each message, but that doesn't sound
like the case here.
If you want help, either clarify what you are saying, or post a minimal
reproducible code example, with expected output vs actual output.
On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi
<[email protected]> wrote:
Cody,
Yes , I am printing each messages . It is processing all messages under
each dstream block.
Source systems are publishing 1 Million messages /4 secs which is less than
batch interval. The issue is with Directsream processing 10 message per
event. When partitions were increased to 20 in topic, DirectStream picksup
only 200 messages ( I guess 10 for each partition ) at a time for processing
. I have 16 executors running for streaming ( both yarn client & cluster
mode).
I am expecting Directsream to process 1 million messages which published
in topic < batch interval .
Using createStream , It could batch 150K messages and process . createStream
is better than Directsream in this case . Again why only 150K.
Any clarification is much appreciated on directStream processing millions
per batch .
Sent from Samsung Mobile.
-------- Original message --------
From: Cody Koeninger <[email protected]>
Date:06/02/2016 01:30 (GMT+05:30)
To: Diwakar Dhanuskodi <[email protected]>
Cc: [email protected]
Subject: Re: Kafka directsream receiving rate
Have you tried just printing each message, to see which ones are being
processed?
On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi
<[email protected]> wrote:
I am able to see no of messages processed per event in sparkstreaming
web UI . Also I am counting the messages inside foreachRDD .
Removed the settings for backpressure but still the same .
Sent from Samsung Mobile.
-------- Original message --------
From: Cody Koeninger <[email protected]>
Date:06/02/2016 00:33 (GMT+05:30)
To: Diwakar Dhanuskodi <[email protected]>
Cc: [email protected]
Subject: Re: Kafka directsream receiving rate
How are you counting the number of messages?
I'd go ahead and remove the settings for backpressure and maxrateperpartition,
just to eliminate that as a variable.
On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi
<[email protected]> wrote:
I am using one directsream. Below is the call to directsream:-
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" ->
"datanode4.isdp.com:9092")
val k =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
kafkaParams, topicSet)
When I replace DirectStream call to createStream, all messages were
read by one Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap
,StorageLevel.MEMORY_ONLY)
I am using below spark-submit to execute:
./spark-submit --master yarn-client --conf
"spark.dynamicAllocation.enabled=true" --conf
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false"
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s"
--conf "spark.shuffle.consolidateFiles=true" --conf
"spark.streaming.kafka.maxRatePerPartition=1000000" --driver-memory 2g
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
--jars
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
/root/Jars/sparkreceiver.jar
Sent from Samsung Mobile.
-------- Original message --------
From: Cody Koeninger <[email protected]>
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi <[email protected]>
Cc: [email protected]
Subject: Re: Kafka directsream receiving rate
If you're using the direct stream, you have 0 receivers. Do you mean you have
1 executor?
Can you post the relevant call to createDirectStream from your code, as well as
any relevant spark configuration?
On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi
<[email protected]> wrote:
Adding more info
Batch interval is 2000ms.
I expect all 100 messages go thru one dstream from directsream but it
receives at rate of 10 messages at time. Am I missing some configurations
here. Any help appreciated.
Regards
Diwakar.
Sent from Samsung Mobile.
-------- Original message --------
From: Diwakar Dhanuskodi <[email protected]>
Date:05/02/2016 07:33 (GMT+05:30)
To: [email protected]
Cc:
Subject: Kafka directsream receiving rate
Hi,
Using spark 1.5.1.
I have a topic with 20 partitions. When I publish 100 messages. Spark direct
stream is receiving 10 messages per dstream. I have only one receiver .
When I used createStream the receiver received entire 100 messages at once.
Appreciate any help .
Regards
Diwakar
Sent from Samsung Mobile.