In spark streaming 1.3 -

Say I have 10 executors each with 4 cores so in total 40 tasks in parllel
at once. If I repartition kafka directstream to 40 partitions vs say I have
in kafka topic 300 partitions - which one will be more efficient , Should I
repartition the kafka stream equal to num of cores or keep it same as 300?

 If I have number of partitions greater than parllel tasks will that not
cause overhead of task scheduling ?

On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das <t...@databricks.com> wrote:

> For Java, do
>
> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).
> offsetRanges();
>
> If you fix that error, you should be seeing data.
>
> You can call arbitrary RDD operations on a DStream, using
> DStream.transform. Take a look at the docs.
>
> For the direct kafka approach you are doing,
> - tasks do get launched for empty partitions
> - driver may make multiple calls to Kafka brokers to get all the offset
> information. But that does not mean you should reduce partitions. the whole
> point of having large number of partition is the consume the data in
> parallel. If you reduce the number of partitions, that defeats the purpose
> of having partitoins at all. And the driver making calls for getting
> metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually.
> Rather receiving and processing the actual data is usually the bottleneck
> and to increase throughput you should have larger number of partitions.
>
>
>
> On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> I'd suggest you upgrading to 1.4 as it has better metrices and UI.
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Is coalesce not applicable to kafkaStream ? How to do coalesce on
>>> kafkadirectstream its not there in api ?
>>> Shall calling repartition on directstream with number of executors as
>>> numpartitions will imrove perfromance ?
>>>
>>> Does in 1.3 tasks get launched for partitions which are empty? Does
>>> driver makes call for getting offsets of each partition separately or in
>>> single call it gets all partitions new offsets ? I mean will reducing no of
>>>  partitions oin kafka help improving the performance?
>>>
>>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> 1.I am using spark streaming 1.3 for reading from a kafka queue and
>>>> pushing events to external source.
>>>>
>>>> I passed in my job 20 executors but it is showing only 6 in executor
>>>> tab ?
>>>> When I used highlevel streaming 1.2 - its showing 20 executors. My
>>>> cluster is 10 node yarn cluster with each node has 8 cores.
>>>>
>>>> I am calling the script as :
>>>>
>>>> spark-submit --class classname --num-executors 10 --executor-cores 2
>>>> --master yarn-client jarfile
>>>>
>>>> 2. On Streaming UI
>>>>
>>>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015
>>>> Time since start: 13 minutes 28 seconds
>>>> Network receivers: 0
>>>> Batch interval: 1 second
>>>> Processed batches: 807
>>>> Waiting batches: 0
>>>> Received records: 0
>>>> Processed records: 0
>>>>
>>>> Received records and processed records are always 0 . And Speed of
>>>> processing is slow compare to highlevel api.
>>>>
>>>> I am procesing the stream using mapPartition.
>>>>
>>>> When I used
>>>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>,
>>>> Void>() {
>>>>  @Override
>>>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception {
>>>> // TODO Auto-generated method stub
>>>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges();
>>>> }
>>>> }
>>>>
>>>> It throws an exception
>>>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD
>>>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>>>
>>>> Thanks
>>>> Shushant
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to