Hi,

Sorry to bring up this old thread, but my question is about this exact thing:

Guozhang, you said:
> A more concrete example: say you have topic AC: 3 partitions, topic BC: 6
> partitions.
> 
> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads will
> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6 respectively;
> 
> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads will be
> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
> respectively.


You said that in the createMessageStreamsByFilter case, if topic AC had no 
messages in it and consumer.timeout.ms = -1, then the 3 threads might all be 
blocked waiting for data to arrive from topic AC, and so messages from BC would 
not be processed.

createMessageStreamsByFilter("*C" => 1) (single stream) would have the same 
problem but just worse. Behind the scenes, is there a single thread that is 
consuming (round-robin?) messages from the different partitions and inserting 
them all into a single queue for the application code to process? And that is 
why a single partition with no messages with block the other messages from 
getting through?

What about createMessageStreams("AC" => 1)? That creates a single stream that 
contains messages from multiple partitions, which might be on different 
brokers. Does that also suffer the same problem, where if one partition has no 
messages, that the application would not receive messages from the other 
paritions?

Thanks,
-James


On Feb 11, 2015, at 8:13 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> The new consumer will be released in 0.9, which is targeted for end of this
> quarter.
> 
> On Tue, Feb 10, 2015 at 7:11 PM, tao xiao <xiaotao...@gmail.com> wrote:
> 
>> Do you know when the new consumer API will be publicly available?
>> 
>> On Wed, Feb 11, 2015 at 10:43 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> 
>>> Yes, it can get stuck. For example, AC and BC are processed by two
>>> different processes and AC processors gets stuck, hence AC messages will
>>> fill up in the consumer's buffer and eventually prevents the fetcher
>> thread
>>> to put more data into it; the fetcher thread will be blocked on that and
>>> not be able to fetch BC.
>>> 
>>> This issue has been addressed in the new consumer client, which is
>>> single-threaded with non-blocking APIs.
>>> 
>>> Guozhang
>>> 
>>> On Tue, Feb 10, 2015 at 6:24 PM, tao xiao <xiaotao...@gmail.com> wrote:
>>> 
>>>> Thank you Guozhang for your detailed explanation. In your example
>>>> createMessageStreamsByFilter("*C" => 3)  since threads are shared among
>>>> topics there may be situation where all 3 threads threads get stuck
>> with
>>>> topic AC e.g. topic is empty which will be holding the connecting
>> threads
>>>> (setting consumer.timeout.ms=-1) hence there is no thread to serve
>> topic
>>>> BC. do you think this situation will happen?
>>>> 
>>>> On Wed, Feb 11, 2015 at 2:15 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>> 
>>>>> I was not clear before .. for createMessageStreamsByFilter each
>> matched
>>>>> topic will have num-threads, but shared: i.e. there will be totally
>>>>> num-threads created, but each thread will be responsible for fetching
>>> all
>>>>> matched topics.
>>>>> 
>>>>> A more concrete example: say you have topic AC: 3 partitions, topic
>>> BC: 6
>>>>> partitions.
>>>>> 
>>>>> With createMessageStreams("AC" => 3, "BC" => 2) a total of 5 threads
>>> will
>>>>> be created, and consuming AC-1,AC-2,AC-3,BC-1/2/3,BC-4/5/6
>>> respectively;
>>>>> 
>>>>> With createMessageStreamsByFilter("*C" => 3) a total of 3 threads
>> will
>>> be
>>>>> created, and consuming AC-1/BC-1/BC-2, AC-2/BC-3/BC-4, AC-3/BC-5/BC-6
>>>>> respectively.
>>>>> 
>>>>> Guozhang
>>>>> 
>>>>> On Tue, Feb 10, 2015 at 8:37 AM, tao xiao <xiaotao...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> Guozhang,
>>>>>> 
>>>>>> Do you mean that each regex matched topic owns number of threads
>> that
>>>> get
>>>>>> passed in to createMessageStreamsByFilter ? For example in below
>> code
>>>> If
>>>>> I
>>>>>> have 3 matched topics each of which has 2 partitions then I should
>>> have
>>>>> 3 *
>>>>>> 2 = 6 threads in total with each topic owning 2 threads.
>>>>>> 
>>>>>> TopicFilter filter = new Whitelist(".*");
>>>>>> 
>>>>>> int threadTotal = 2;
>>>>>> 
>>>>>> List<KafkaStream<byte[], byte[]>> streams = connector
>>>>>> .createMessageStreamsByFilter(filter, threadTotal);
>>>>>> 
>>>>>> 
>>>>>> But what I observed from the log is different
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-5 with
>> consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>> partition 1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0) for topic kafkatopic-1 with consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 WARN  kafka.utils.Logging$class:83 - No broker
>>>>>> partitions consumed by consumer thread
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 for topic
>> kafkatopic-1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 - Consumer
>>>>>> test1234dd5_localhost-1423585444070-82f23758 rebalancing the
>>> following
>>>>>> partitions: ArrayBuffer(0, 1) for topic kafkatopic-4 with
>> consumers:
>>>>>> List(test1234dd5_localhost-1423585444070-82f23758-0,
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1)
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-1 attempting to claim
>>>>>> partition 1
>>>>>> 
>>>>>> 2015-02-11 00:24:04 INFO  kafka.utils.Logging$class:68 -
>>>>>> test1234dd5_localhost-1423585444070-82f23758-0 attempting to claim
>>>>>> partition 0
>>>>>> 
>>>>>> 
>>>>>> As you can see from the log there are only 2 threads created and
>>> shared
>>>>>> among 3 topics. With this setting I think the parallelism is
>> degraded
>>>>> and a
>>>>>> slow topic may impact other topics' consumption performance. Any
>>>>> thoughts?
>>>>>> 
>>>>>> On Wed, Feb 11, 2015 at 12:00 AM, Guozhang Wang <
>> wangg...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> createMessageStreams is used for consuming from specific
>> topic(s),
>>>>> where
>>>>>>> you can put a map of [topic-name, num-threads] as its input
>>>> parameters;
>>>>>>> 
>>>>>>> createMessageStreamsByFilter is used for consuming from wildcard
>>>>> topics,
>>>>>>> where you can put a (regex, num-threads) as its input parameters,
>>> and
>>>>> for
>>>>>>> each regex matched topic num-threads will be created.
>>>>>>> 
>>>>>>> The difference between these two are not really for throughput /
>>>>> latency,
>>>>>>> but rather consumption semantics.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Feb 10, 2015 at 3:02 AM, tao xiao <xiaotao...@gmail.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi team,
>>>>>>>> 
>>>>>>>> I am comparing the differences between
>>>>>>>> ConsumerConnector.createMessageStreams
>>>>>>>> and ConsumerConnector.createMessageStreamsByFilter. My
>>>> understanding
>>>>> is
>>>>>>>> that createMessageStreams creates x number of threads (x is the
>>>>> number
>>>>>> of
>>>>>>>> threads passed in to the method) dedicated to the specified
>> topic
>>>>>>>> while createMessageStreamsByFilter creates x number of threads
>>>> shared
>>>>>> by
>>>>>>>> topics specified by TopicFilter. Is it correct?
>>>>>>>> 
>>>>>>>> If this is the case I assume createMessageStreams is the
>>> preferred
>>>>> way
>>>>>> to
>>>>>>>> create streams for each topic if I have high throughput and low
>>>>> latency
>>>>>>>> demands. is my assumption correct?
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Tao
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Regards,
>>>>>> Tao
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> -- Guozhang
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Regards,
>>>> Tao
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 
>> 
>> 
>> --
>> Regards,
>> Tao
>> 
> 
> 
> 
> --
> -- Guozhang

Reply via email to