1. Here you are basically creating 2 receivers and asking each of them to
consume 3 kafka partitions each.

- In 1.2 we have high level consumers so how can we restrict no of kafka
partitions to consume from? Say I have 300 kafka partitions in kafka topic
and as in above I gave 2 receivers and 3 kafka partitions . Then is it mean
I will read from 6 out of 300 partitions only and for rest 294 partitions
data is lost?


2.One more doubt in spark streaming how is it decided which part of main
function of driver will run at each batch interval ? Since whole code is
written in one function(main function in driver) so how it determined kafka
streams receivers  not to be registered in each batch only processing to be
done .






On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Let me take ashot at your questions. (I am sure people like Cody and TD
> will correct if I am wrong)
>
> 0. This is exact copy from the similar question in mail thread from Akhil
> D:
> Since you set local[4] you will have 4 threads for your computation, and
> since you are having 2 receivers, you are left with 2 threads to process (
> (0 + 2) <-- This 2 is your 2 threads.) And the other /2 means you are
> having 2 tasks in that stage (with id 0).
>
> 1. Here you are basically creating 2 receivers and asking each of them to
> consume 3 kafka partitions each.
> 2. How does that matter? It depends on how many receivers you have created
> to consume that data and if you have repartitioned it. Remember, spark is
> lazy and executors are relted to the context
> 3. I think in java, factory method is fixed. You just pass around the
> contextFactory object. (I love python :) see the signature isso much
> cleaner :) )
> 4. Yes, if you use spark checkpointing. You can use yourcustom check
> pointing too.
>
> Best
> Ayan
>
>
>
> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Few doubts :
>>
>> In 1.2 streaming when I use union of streams , my streaming application
>> getting hanged sometimes and nothing gets printed on driver.
>>
>>
>> [Stage 2:>
>>
>>                                     (0 + 2) / 2]
>>  Whats is 0+2/2 here signifies.
>>
>>
>>
>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); be
>> same as numstreams=2 ? in unioned stream ?
>>
>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>> receivers and 5 execuots . As in stream receivers nodes get fixed at start
>> of app throughout its lifetime . Does executors gets allicated at start of
>> each job on 1s batch interval? If yes, how does its fast to allocate
>> resources. I mean if i increase num-executors to 50 , it will negotiate 50
>> executors from yarnRM at start of each job so does it takes more time in
>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>> fixed processing executors also throughout the app?
>>
>>
>>
>>
>> SparkConf conf = new SparkConf().setAppName("SampleSparkStreamingApp");
>> JavaStreamingContext jssc = new
>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>
>> Map<String,String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>> kafkaParams.put("group.id", "testgroup");
>> kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
>> topicsMap.put("testSparkPartitioned", 3);
>> int numStreams = 2;
>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new
>> ArrayList<JavaPairDStream<byte[], byte[]>>();
>>   for(int i=0;i<numStreams;i++){
>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>> kafka.serializer.DefaultDecoder.class,
>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>> }
>>  JavaPairDStream<byte[], byte[]> directKafkaStream =
>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>> kafkaStreams.size()));
>>  JavaDStream<String> lines = directKafkaStream.map(new
>> Function<Tuple2<byte[],byte[]>, String>() {
>>
>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception {
>> ...processing
>> ..return msg;
>> }
>> });
>> lines.print();
>> jssc.start();
>> jssc.awaitTermination();
>>
>>
>>
>>
>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>> 3.For avoiding dataloss when we use checkpointing, and factory method to
>> create sparkConytext, is method name fixed
>> or we can use any name and how to set in app the method name to be used ?
>>
>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>> zookeeper, is it because of zookeeper is not efficient for high writes and
>> read is not strictly consistent? So
>>
>>  we use simple Kafka API that does not use Zookeeper and offsets tracked
>> only by Spark Streaming within its checkpoints. This eliminates
>> inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each
>> record is received by Spark Streaming effectively exactly once despite
>> failures.
>>
>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>> checkoint location ? Means does hdfs be used for small data(just offset?)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> There is another option to try for Receiver Based Low Level Kafka
>>> Consumer which is part of Spark-Packages (
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . This
>>> can be used with WAL as well for end to end zero data loss.
>>>
>>> This is also Reliable Receiver and Commit offset to ZK.  Given the
>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka API
>>> for Receiver based approach may leads to issues related Consumer
>>> Re-balancing  which is a major issue of Kafka High Level API.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>>
>>>
>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> In the receiver based approach, If the receiver crashes for any reason
>>>> (receiver crashed or executor crashed) the receiver should get restarted on
>>>> another executor and should start reading data from the offset present in
>>>> the zookeeper. There is some chance of data loss which can alleviated using
>>>> Write Ahead Logs (see streaming programming guide for more details, or see
>>>> my talk [Slides PDF
>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>>> , Video
>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>>> ] from last Spark Summit 2015). But that approach can give duplicate
>>>> records. The direct approach gives exactly-once guarantees, so you should
>>>> try it out.
>>>>
>>>> TD
>>>>
>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> Read the spark streaming guide ad the kafka integration guide for a
>>>>> better understanding of how the receiver based stream works.
>>>>>
>>>>> Capacity planning is specific to your environment and what the job is
>>>>> actually doing, youll need to determine it empirically.
>>>>>
>>>>>
>>>>> On Friday, June 26, 2015, Shushant Arora <shushantaror...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> In 1.2 how to handle offset management after stream application
>>>>>> starts in each job . I should commit offset after job completion 
>>>>>> manually?
>>>>>>
>>>>>> And what is recommended no of consumer threads. Say I have 300
>>>>>> partitions in kafka cluster . Load is ~ 1 million events per second.Each
>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each 
>>>>>> receiver
>>>>>> is sufficient for spark streaming to consume ?
>>>>>>
>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The receiver-based kafka createStream in spark 1.2 uses zookeeper to
>>>>>>> store offsets.  If you want finer-grained control over offsets, you can
>>>>>>> update the values in zookeeper yourself before starting the job.
>>>>>>>
>>>>>>> createDirectStream in spark 1.3 is still marked as experimental, and
>>>>>>> subject to change.  That being said, it works better for me in 
>>>>>>> production
>>>>>>> than the receiver based api.
>>>>>>>
>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am using spark streaming 1.2.
>>>>>>>>
>>>>>>>> If processing executors get crashed will receiver rest the offset
>>>>>>>> back to last processed offset?
>>>>>>>>
>>>>>>>> If receiver itself got crashed is there a way to reset the offset
>>>>>>>> without restarting streaming application other than smallest or 
>>>>>>>> largest.
>>>>>>>>
>>>>>>>>
>>>>>>>> Is spark streaming 1.3  which uses low level consumer api, stabe?
>>>>>>>> And which is recommended for handling data  loss 1.2 or 1.3 .
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to