In spark streaming 1.3 - Say I have 10 executors each with 4 cores so in total 40 tasks in parllel at once. If I repartition kafka directstream to 40 partitions vs say I have in kafka topic 300 partitions - which one will be more efficient , Should I repartition the kafka stream equal to num of cores or keep it same as 300?
If I have number of partitions greater than parllel tasks will that not cause overhead of task scheduling ? On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das <t...@databricks.com> wrote: > For Java, do > > OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*). > offsetRanges(); > > If you fix that error, you should be seeing data. > > You can call arbitrary RDD operations on a DStream, using > DStream.transform. Take a look at the docs. > > For the direct kafka approach you are doing, > - tasks do get launched for empty partitions > - driver may make multiple calls to Kafka brokers to get all the offset > information. But that does not mean you should reduce partitions. the whole > point of having large number of partition is the consume the data in > parallel. If you reduce the number of partitions, that defeats the purpose > of having partitoins at all. And the driver making calls for getting > metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually. > Rather receiving and processing the actual data is usually the bottleneck > and to increase throughput you should have larger number of partitions. > > > > On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> I'd suggest you upgrading to 1.4 as it has better metrices and UI. >> >> Thanks >> Best Regards >> >> On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Is coalesce not applicable to kafkaStream ? How to do coalesce on >>> kafkadirectstream its not there in api ? >>> Shall calling repartition on directstream with number of executors as >>> numpartitions will imrove perfromance ? >>> >>> Does in 1.3 tasks get launched for partitions which are empty? Does >>> driver makes call for getting offsets of each partition separately or in >>> single call it gets all partitions new offsets ? I mean will reducing no of >>> partitions oin kafka help improving the performance? >>> >>> On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> 1.I am using spark streaming 1.3 for reading from a kafka queue and >>>> pushing events to external source. >>>> >>>> I passed in my job 20 executors but it is showing only 6 in executor >>>> tab ? >>>> When I used highlevel streaming 1.2 - its showing 20 executors. My >>>> cluster is 10 node yarn cluster with each node has 8 cores. >>>> >>>> I am calling the script as : >>>> >>>> spark-submit --class classname --num-executors 10 --executor-cores 2 >>>> --master yarn-client jarfile >>>> >>>> 2. On Streaming UI >>>> >>>> Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 >>>> Time since start: 13 minutes 28 seconds >>>> Network receivers: 0 >>>> Batch interval: 1 second >>>> Processed batches: 807 >>>> Waiting batches: 0 >>>> Received records: 0 >>>> Processed records: 0 >>>> >>>> Received records and processed records are always 0 . And Speed of >>>> processing is slow compare to highlevel api. >>>> >>>> I am procesing the stream using mapPartition. >>>> >>>> When I used >>>> directKafkaStream.foreachRDD(new Function<JavaPairRDD<byte[],byte[]>, >>>> Void>() { >>>> @Override >>>> public Void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception { >>>> // TODO Auto-generated method stub >>>> OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); >>>> } >>>> } >>>> >>>> It throws an exception >>>> java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD >>>> cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges >>>> >>>> Thanks >>>> Shushant >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >> >