On Wed, May 20, 2015 at 1:12 PM, Shushant Arora <shushantaror...@gmail.com> wrote:
> So I can explicitly specify no of receivers and executors in receiver > based streaming? Can you share a sample program if any? > -You can look at the lowlevel consumer repo <https://github.com/dibbhatt/kafka-spark-consumer> shared by Dibyendu for sample code. > > > Also in Low level non receiver based , will data be fetched by same worker > executor node and processed ? Also if I have concurrent jobs set to 1- so > in low level > fetching and processing will be delayed till next job starts ,say a > situation where I have 1 sec of stream interval but my job1 takes 5 sec to > complete , hence job2 starts at end of 5 sec, so now will it process all > data from sec1 to sec 5 in low level non receiver streaming or only for > interval sec1-sec2 ? > > And if it processes data for complete duration sec1-sec5.Is there any > option to suppress start of other queued jobs(for interval sec2-3, > sec3-4,sec4-5) since there work is already done by job2 ? > - I believe all your data from sec2-sec5 will be available in Kafka and when the second batch starts at 5 sec it will consumer it (you can also limit the rate with spark.streaming.kafka.maxRatePerPartition) Read more here https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md > > > On Wed, May 20, 2015 at 12:36 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> One receiver basically runs on 1 core, so if your single node is having 4 >> cores, there are still 3 cores left for the processing (for executors). And >> yes receiver remains on the same machine unless some failure happens. >> >> Thanks >> Best Regards >> >> On Tue, May 19, 2015 at 10:57 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Thanks Akhil andDibyendu. >>> >>> Does in high level receiver based streaming executors run on receivers >>> itself to have data localisation ? Or its always data is transferred to >>> executor nodes and executor nodes differ in each run of job but receiver >>> node remains same(same machines) throughout life of streaming application >>> unless node failure happens? >>> >>> >>> >>> On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> Just to add, there is a Receiver based Kafka consumer which uses Kafka >>>> Low Level Consumer API. >>>> >>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer >>>> >>>> >>>> Regards, >>>> Dibyendu >>>> >>>> On Tue, May 19, 2015 at 9:00 PM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> >>>>> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> So for Kafka+spark streaming, Receiver based streaming used highlevel >>>>>> api and non receiver based streaming used low level api. >>>>>> >>>>>> 1.In high level receiver based streaming does it registers consumers >>>>>> at each job start(whenever a new job is launched by streaming application >>>>>> say at each second)? >>>>>> >>>>> >>>>> -> Receiver based streaming will always have the receiver running >>>>> parallel while your job is running, So by default for every 200ms >>>>> (spark.streaming.blockInterval) the receiver will generate a block of data >>>>> which is read from Kafka. >>>>> >>>>> >>>>> >>>>>> 2.No of executors in highlevel receiver based jobs will always equal >>>>>> to no of partitions in topic ? >>>>>> >>>>> >>>>> -> Not sure from where did you came up with this. For the non stream >>>>> based one, i think the number of partitions in spark will be equal to the >>>>> number of kafka partitions for the given topic. >>>>> >>>>> >>>>> >>>>>> 3.Will data from a single topic be consumed by executors in parllel >>>>>> or only one receiver consumes in multiple threads and assign to executors >>>>>> in high level receiver based approach ? >>>>>> >>>>>> -> They will consume the data parallel. For the receiver based >>>>> approach, you can actually specify the number of receiver that you want to >>>>> spawn for consuming the messages. >>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, May 19, 2015 at 2:38 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> spark.streaming.concurrentJobs takes an integer value, not boolean. >>>>>>> If you set it as 2 then 2 jobs will run parallel. Default value is 1 and >>>>>>> the next job will start once it completes the current one. >>>>>>> >>>>>>> >>>>>>>> Actually, in the current implementation of Spark Streaming and >>>>>>>> under default configuration, only job is active (i.e. under execution) >>>>>>>> at >>>>>>>> any point of time. So if one batch's processing takes longer than 10 >>>>>>>> seconds, then then next batch's jobs will stay queued. >>>>>>>> This can be changed with an experimental Spark property >>>>>>>> "spark.streaming.concurrentJobs" which is by default set to 1. Its not >>>>>>>> currently documented (maybe I should add it). >>>>>>>> The reason it is set to 1 is that concurrent jobs can potentially >>>>>>>> lead to weird sharing of resources and which can make it hard to debug >>>>>>>> the >>>>>>>> whether there is sufficient resources in the system to process the >>>>>>>> ingested >>>>>>>> data fast enough. With only 1 job running at a time, it is easy to see >>>>>>>> that >>>>>>>> if batch processing time < batch interval, then the system will be >>>>>>>> stable. >>>>>>>> Granted that this may not be the most efficient use of resources under >>>>>>>> certain conditions. We definitely hope to improve this in the future. >>>>>>> >>>>>>> >>>>>>> Copied from TD's answer written in SO >>>>>>> <http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming> >>>>>>> . >>>>>>> >>>>>>> Non-receiver based streaming for example you can say are the >>>>>>> fileStream, directStream ones. You can read a bit of information from >>>>>>> here >>>>>>> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html >>>>>>> >>>>>>> Thanks >>>>>>> Best Regards >>>>>>> >>>>>>> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks Akhil. >>>>>>>> When I don't set spark.streaming.concurrentJobs to true. Will the >>>>>>>> all pending jobs starts one by one after 1 jobs completes,or it does >>>>>>>> not >>>>>>>> creates jobs which could not be started at its desired interval. >>>>>>>> >>>>>>>> And Whats the difference and usage of Receiver vs non-receiver >>>>>>>> based streaming. Is there any documentation for that? >>>>>>>> >>>>>>>> On Tue, May 19, 2015 at 1:35 PM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> It will be a single job running at a time by default (you can also >>>>>>>>> configure the spark.streaming.concurrentJobs to run jobs parallel >>>>>>>>> which is >>>>>>>>> not recommended to put in production). >>>>>>>>> >>>>>>>>> Now, your batch duration being 1 sec and processing time being 2 >>>>>>>>> minutes, if you are using a receiver based streaming then ideally >>>>>>>>> those >>>>>>>>> receivers will keep on receiving data while the job is running (which >>>>>>>>> will >>>>>>>>> accumulate in memory if you set StorageLevel as MEMORY_ONLY and end >>>>>>>>> up in >>>>>>>>> block not found exceptions as spark drops some blocks which are yet to >>>>>>>>> process to accumulate new blocks). If you are using a non-receiver >>>>>>>>> based >>>>>>>>> approach, you will not have this problem of dropping blocks. >>>>>>>>> >>>>>>>>> Ideally, if your data is small and you have enough memory to hold >>>>>>>>> your data then it will run smoothly without any issues. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Tue, May 19, 2015 at 1:23 PM, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> What happnes if in a streaming application one job is not yet >>>>>>>>>> finished and stream interval reaches. Does it starts next job or >>>>>>>>>> wait for >>>>>>>>>> first to finish and rest jobs will keep on accumulating in queue. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Say I have a streaming application with stream interval of 1 sec, >>>>>>>>>> but my job takes 2 min to process 1 sec stream , what will happen ? >>>>>>>>>> At any >>>>>>>>>> time there will be only one job running or multiple ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >