Fetcher thread is per broker basis, it ensures that at lease one fetcher
thread per broker. Fetcher thread is sent to broker with a fetch request to
ask for all partitions. So if A, B, C are in the same broker fetcher thread
is still able to fetch data from A, B, C even though A returns no data.
same logic is applied to different broker.

On Thu, Mar 12, 2015 at 6:25 AM, James Cheng <jch...@tivo.com> wrote:

>
> On Mar 11, 2015, at 9:12 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi James,
> >
> > What I meant before is that a single fetcher may be responsible for
> putting
> > fetched data to multiple queues according to the construction of the
> > streams setup, where each queue may be consumed by a different thread.
> And
> > the queues are actually bounded. Now say if there are two queues that are
> > getting data from the same fetcher F, and are consumed by two different
> > user threads A and B. If thread A for some reason got slowed / hung
> > consuming data from queue 1, then queue 1 will eventually get full, and F
> > trying to put more data to it will be blocked. Since F is parked on
> trying
> > to put data to queue 1, queue 2 will not get more data from it, and
> thread
> > B may hence gets starved. Does that make sense now?
> >
>
> Yes, that makes sense. That is the scenario where one thread of a consumer
> can cause a backup in the queue, which would cause other threads to not
> receive data.
>
> What about the situation I described, where a thread consumes a queue that
> is supposed to be filled with messages from multiple partitions? If
> partition A has no messages and partitions B and C do, how will the fetcher
> behave? Will the processing thread receive messages from partitions B and C?
>
> Thanks,
> -James
>
>
> > Guozhang
> >
> > On Tue, Mar 10, 2015 at 5:15 PM, James Cheng <jch...@tivo.com> wrote:
> >
> >> Hi,
> >>
> >> Sorry to bring up this old thread, but my question is about this exact
> >> thing:
> >>
> >> Guozhang, you said:
> >>> A more concrete example: say you have topic AC: 3 partitions, topic
> BC: 6
> >>> partitions.
> >>>
> >>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
> will
> >>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> respectively;
> >>>
> >>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will
> be
> >>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> >>> respectively.
> >>
> >>
> >> You said that in the createMessageStreamsByFilter case, if topic AC had
> no
> >> messages in it and consumer.timeout.ms = -1, then the 3 threads might
> all
> >> be blocked waiting for data to arrive from topic AC, and so messages
> from
> >> BC would not be processed.
> >>
> >> createMessageStreamsByFilter("*C" => 1) (single stream) would have the
> >> same problem but just worse. Behind the scenes, is there a single thread
> >> that is consuming (round-robin?) messages from the different partitions
> and
> >> inserting them all into a single queue for the application code to
> process?
> >> And that is why a single partition with no messages with block the other
> >> messages from getting through?
> >>
> >> What about createMessageStreams("AC" => 1)? That creates a single stream
> >> that contains messages from multiple partitions, which might be on
> >> different brokers. Does that also suffer the same problem, where if one
> >> partition has no messages, that the application would not receive
> messages
> >> from the other paritions?
> >>
> >> Thanks,
> >> -James
> >>
> >>
> >> On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> >>
> >>> The new consumer will be released in 0.9, which is targeted for end of
> >> this
> >>> quarter.
> >>>
> >>> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com>
> wrote:
> >>>
> >>>> Do you know when the new consumer API will be publicly available?
> >>>>
> >>>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Yes, it can get stuck. For example, AC and BC are processed by two
> >>>>> different processes and AC processors gets stuck, hence AC messages
> >> will
> >>>>> fill up in the consumer's buffer and eventually prevents the fetcher
> >>>> thread
> >>>>> to put more data into it; the fetcher thread will be blocked on that
> >> and
> >>>>> not be able to fetch BC.
> >>>>>
> >>>>> This issue has been addressed in the new consumer client, which is
> >>>>> single-threaded with non-blocking APIs.
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Thank you Guozhang for your detailed explanation. In your example
> >>>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared
> >> among
> >>>>>> topics there may be situation where all 3 threads threads get stuck
> >>>> with
> >>>>>> topic AC e.g. topic is empty which will be holding the connecting
> >>>> threads
> >>>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
> >>>> topic
> >>>>>> BC. do you think this situation will happen?
> >>>>>>
> >>>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> I was not clear before .. for createMessageStreamsByFilter each
> >>>> matched
> >>>>>>> topic will have num-threads, but shared: i.e. there will be totally
> >>>>>>> num-threads created, but each thread will be responsible for
> fetching
> >>>>> all
> >>>>>>> matched topics.
> >>>>>>>
> >>>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
> >>>>> BC: 6
> >>>>>>> partitions.
> >>>>>>>
> >>>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5
> threads
> >>>>> will
> >>>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
> >>>>> respectively;
> >>>>>>>
> >>>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
> >>>> will
> >>>>> be
> >>>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4,
> AC-3/BC-5/BC-6
> >>>>>>> respectively.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> Guozhang,
> >>>>>>>>
> >>>>>>>> Do you mean that each regex matched topic owns number of threads
> >>>> that
> >>>>>> get
> >>>>>>>> passed in to createMessageStreamsByFilter ? For example in below
> >>>> code
> >>>>>> If
> >>>>>>> I
> >>>>>>>> have 3 matched topics each of which has 2 partitions then I should
> >>>>> have
> >>>>>>> 3 *
> >>>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
> >>>>>>>>
> >>>>>>>> TopicFilter filter = new Whitelist(".*");
> >>>>>>>>
> >>>>>>>> int threadTotal = 2;
> >>>>>>>>
> >>>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
> >>>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> But what I observed from the log is different
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
> >>>> consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>>>> partition 1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
> >>>>>>>> partitions consumed by consumer thread
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
> >>>> kafkatopic-1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
> >>>>> following
> >>>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
> >>>> consumers:
> >>>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
> >>>>>>>> partition 1
> >>>>>>>>
> >>>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
> >>>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
> >>>>>>>> partition 0
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> As you can see from the log there are only 2 threads created and
> >>>>> shared
> >>>>>>>> among 3 topics. With this setting I think the parallelism is
> >>>> degraded
> >>>>>>> and a
> >>>>>>>> slow topic may impact other topics' consumption performance. Any
> >>>>>>> thoughts?
> >>>>>>>>
> >>>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
> >>>> wangg...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> createMessageStreams is used for consuming from specific
> >>>> topic(s),
> >>>>>>> where
> >>>>>>>>> you can put a map of [topic-name, num-threads] as its input
> >>>>>> parameters;
> >>>>>>>>>
> >>>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
> >>>>>>> topics,
> >>>>>>>>> where you can put a (regex, num-threads) as its input parameters,
> >>>>> and
> >>>>>>> for
> >>>>>>>>> each regex matched topic num-threads will be created.
> >>>>>>>>>
> >>>>>>>>> The difference between these two are not really for throughput /
> >>>>>>> latency,
> >>>>>>>>> but rather consumption semantics.
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi team,
> >>>>>>>>>>
> >>>>>>>>>> I am comparing the differences between
> >>>>>>>>>> ConsumerConnector.createMessageStreams
> >>>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
> >>>>>> understanding
> >>>>>>> is
> >>>>>>>>>> that createMessageStreams creates x number of threads (x is the
> >>>>>>> number
> >>>>>>>> of
> >>>>>>>>>> threads passed in to the method) dedicated to the specified
> >>>> topic
> >>>>>>>>>> while createMessageStreamsByFilter creates x number of threads
> >>>>>> shared
> >>>>>>>> by
> >>>>>>>>>> topics specified by TopicFilter. Is it correct?
> >>>>>>>>>>
> >>>>>>>>>> If this is the case I assume createMessageStreams is the
> >>>>> preferred
> >>>>>>> way
> >>>>>>>> to
> >>>>>>>>>> create streams for each topic if I have high throughput and low
> >>>>>>> latency
> >>>>>>>>>> demands. is my assumption correct?
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Regards,
> >>>>>>>>>> Tao
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Regards,
> >>>>>>>> Tao
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Regards,
> >>>>>> Tao
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Regards,
> >>>> Tao
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>
> >>
> >
> >
> > --
> > -- Guozhang
>
>


-- 
Regards,
Tao

Reply via email to