Hi Andrew,

Thanks for your reply. PFB the answers.


   1. Are all 200 topics hosted on a single Kafka broker (a single node)?

[ans] :Yes, the single node Kafka server is integrated with Golden gate and
we need to consume transactions from around 250 tables ,each of which has a
corresponding topic in Kafka. There are plans to expand the number of Kafka
servers but for that to happen we need to prove that our data flow works
with a single node server.


   1. What is the number of partitions per topic?

[ans] : All the topics have just one partition.


   1. Which Kafka consumer for NiFi are you using?

I am using ConsumeKafka 1.5.0.3.1.0.0-564.

   1. Can you share the configuration of that?

Since its a single partition topic and we have a 4 node Nifi cluster so the
consume kafka processor is scheduled to run on Primary node only and 1
concurrent task. I read in one the articles that the optimal performance is
achieved if the no of partitions aligns with the concurrent tasks on the
consumer side. I've kept the Max poll records to 5000.
[image: image.png]
[image: image.png]


   1. How many nodes are in the NiFi cluster?

We have a 4 node Nifi cluster managed by Ambari ( HDF 3.1)  , Nifi (1.5.0)
, Kafka (1.0.0.3.1.0). Each node  has a 8 core cpu.

 >> NiFi consuming data from 200 topics should not be a problem for NiFi,
the parallelism if a factor driven by the partitions for the topics.
I am trying to find out the best practice consume for 200 topics. I tried
the following approaches so far

1 . 3 consumers each with 99 topics (separated with comma, all consumers
running on primary node with single concurrent tasks)
Result : The performance was very slow but it started consuming until the
back pressure was applied on the connections and after a while i started
getting this error
'Failed to retain connection due to No current assignment for partition
TEST_KAFKA_TOPIC'

The error went away on its own but then i started getting this error
Was interrupted while trying to communicate with Kafka with lease
org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@6cb8afba.
Will roll back session and discard any partially received data.

I thought maybe i did not have enough concurrent tasks available so i
increased it to 4 and scheduled the consumer to run on all nodes ( which
means 4 X 4 =16 tasks working on 99 single partition topics) . I also
increase the Maximum timer driven thread count to 32 which i understand is
the highest limit i can set on a 8 core cpu ( it should be at max 4 times
the number of cores on a single node? ). But i could not see any major
performance improvement and i got the above mentioned errors intermittently
. Because of the high back pressure i increased the individual connection
back-pressure threshold to 40K but that resulted in OOM errors, So i had to
set the JVM to 2048m .
I then tried the 2nd approach

2 . 250 consume_kafka processor with 1 topic each ( all consumers running
on primary node with single concurrent tasks)

The performance is a little better but then again i get a lot of errors
like this
'Failed to retain connection due to No current assignment for partition
TEST_KAFKA_TOPIC'
Was interrupted while trying to communicate with Kafka with lease
org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@6cb8afba.
Will roll back session and discard any partially received data.
I also added these configuration on the kafka broker
[image: image.png]

Please let me know of your opinion.

Regards,
Faisal





On Wed, Jun 20, 2018 at 6:50 PM Andrew Psaltis <psaltis.and...@gmail.com>
wrote:

> Hi Faisal,
> Sorry for the slow response. I do have a couple of questions:
>
>    1. Are all 200 topics hosted on a single Kafka broker (a single node)?
>    2. What is the number of partitions per topic?
>    3. Which Kafka consumer for NiFi are you using?
>    4. Can you share the configuration of that?
>    5. How many nodes are in the NiFi cluster?
>
>
> NiFi consuming data from 200 topics should not be a problem for NiFi,  the
> parallelism if a factor driven by the partitions for the topics.
>
> RE: I seem to have tried most of the options but i guess Nifi doesn't
> scale by default much like other distributed systems that i have worked
> with (mainly teradata) and needs to be configured alot before you can
> expect it be working.
>
> I'm not sure I quite understand - the configuration should be minimal to
> go from single node to cluster, the consumption rate from Kafka always
> starts with partitioning and proper broker setup.
>
> RE: Most of our load test/ performance test surprisingly worked fine in a
> single node environment. But it all seems to be working very badly as soon
> as we moved to multi node setup.
> Can you describe both the Kafka setup and the NiFi setup?
>
> RE:  In theory that would me that there should 200 task /thread running
> in parallel
> I'm not sure I follow this -- are you basing this on the number of topics?
>
>
> Thanks,
> Andrew
>
>
>
>
>
>
>
> On Tue, Jun 19, 2018 at 10:03 PM Faisal Durrani <te04.0...@gmail.com>
> wrote:
>
>> Hi Andrew,
>>
>> I was wondering if you got the chance to review the information i sent. I
>> seem to have tried most of the options but i guess Nifi doesn't scale by
>> default much like other distributed systems that i have worked with (mainly
>> teradata) and needs to be configured alot before you can expect it be
>> working.
>>
>> Most of our load test/ performance test surprisingly worked fine in a
>> single node environment. But it all seems to be working very badly as soon
>> as we moved to multi node setup.
>>
>> Is there any documented use case where Nifi worked with consume kafka
>> getting data from over 200 sources ( tables)  ? In theory that would me
>> that there should 200 task /thread running in parallel .
>>
>> On Thu, Jun 14, 2018 at 11:24 PM Faisal Durrani <te04.0...@gmail.com>
>> wrote:
>>
>>> hi Andrew,
>>> The kafka broker is hosted on a single node and this particular topic
>>> has just 1 partition. The consume kafka processor is scheduled to run only
>>> on primary node with 1 concurrent processor. Everything works well with
>>> about 50 consumers consuming from 50 topics of the same nature. When we
>>> start consuming from Over 100-200 consumer all these errors come. Because
>>> of the back pressure alot of consumers have to wait so to get around that i
>>> set each of the processor with additional property of timeout.ms set to
>>> 70000 but that did not work.  Quite strangly the consumer also sometimes
>>> starts consuming those messages which it has already consumed in the past
>>> so i think there is some thing also wrong with the commit configuration.
>>> There is some other additional property that guess i need to setup on the
>>> broker side which will make it scalable. But i'm unable to find it. Kindly
>>> let me know if you have faced a similar situation.
>>>
>>> On Thu, 14 Jun 2018, 11:10 p.m. Andrew Psaltis, <
>>> psaltis.and...@gmail.com> wrote:
>>>
>>>> Hi Faisal,
>>>> How many partitions are there for that TEST_KAFKA_TOPIC topic?
>>>>
>>>> On Thu, Jun 14, 2018 at 9:06 PM Faisal Durrani <te04.0...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Mark, The heap size is set to 4 gb and the time driven threas count
>>>>> is set to 32 since its a 8 core  node( we have a cluster of 4 nifi nodes) 
>>>>> .
>>>>> Despite of all this we keep getting these errors :( .
>>>>>
>>>>>
>>>>>
>>>>> On Thu, 14 Jun 2018, 10:02 p.m. Mark Payne, <marka...@hotmail.com>
>>>>> wrote:
>>>>>
>>>>>> Faisal,
>>>>>>
>>>>>> How much heap do you have allocated to your NiFi instance? In
>>>>>> conf/bootstrap.conf
>>>>>> the default value is 512 MB. If you haven't changed that, you could
>>>>>> be just running out of
>>>>>> heap.
>>>>>>
>>>>>> Also, have you changed the maximum number of threads available to
>>>>>> your NiFi instance?
>>>>>> In the top-right menu you can go to Controller Settings. The default
>>>>>> for "Maximum Timer Driven
>>>>>> Thread Count" is 10, but you'll definitely want to increase that for
>>>>>> your use case.
>>>>>>
>>>>>> Also, how many cores does the VM/node that NiFi is running on have?
>>>>>>
>>>>>> Thanks
>>>>>> -Mark
>>>>>>
>>>>>>
>>>>>> On Jun 14, 2018, at 3:39 AM, Faisal Durrani <te04.0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> it appears that the error only comes when i increase the number of
>>>>>> consumers. It seems to work fine with around 100 consumers but as soon 
>>>>>> as i
>>>>>> bump it up at about 150 consumers all hell breaks lose with error such as
>>>>>> below
>>>>>>
>>>>>> "Commit cannot be completed due to group rebalance"
>>>>>> "Failed to retain connection due to No current assignment for
>>>>>> partition TEST_KAFKA_TOPIC: "
>>>>>>
>>>>>> Quite strangely the consumers start consuming messages that have
>>>>>> already been read before. I cant wrap me head around why that is 
>>>>>> happening.
>>>>>> Can someone point to the right direction here? I tried wild carding
>>>>>> all the topics in one consumer but then it becomes very slow. All the
>>>>>> consumers are running on the primary node with single concurrent tasks .
>>>>>>
>>>>>> On Mon, Jun 11, 2018 at 11:17 AM Joe Witt <joe.w...@gmail.com> wrote:
>>>>>>
>>>>>>> So you have a unique instance of the ConsumeKafka proc for each
>>>>>>> topic rhen, right?
>>>>>>>
>>>>>>> Id increase the flow controller thread pool size by quite a bit as
>>>>>>> well.
>>>>>>>
>>>>>>> On Sun, Jun 10, 2018, 10:13 PM Faisal Durrani <te04.0...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Yes the kafka service is hosted on a single server while NIFI is on
>>>>>>>> a cluster of 4 servers. I'm not entirely sure what wild carding of 
>>>>>>>> topics
>>>>>>>> is but kafka is integrated with a Oracle golden gate and the topics are
>>>>>>>> auto generated as soon as a new table is created in Oracle.
>>>>>>>>
>>>>>>>> "If you want that in a single instance you may need to alter the
>>>>>>>> timeout associated with any single kafka consumer. "
>>>>>>>> Can let me know which configuration is this?
>>>>>>>>
>>>>>>>> The consumer kafka processors are scheduled(Timer driven ) to run
>>>>>>>> on all four nodes with 4 concurrent task so i assume there will be 16
>>>>>>>> threads.( the Maximum timer driven thread count is set to 40)
>>>>>>>>
>>>>>>>> We tested the whole data flow with about 15-20 consumers and
>>>>>>>> everything worked fine with out any errors. We started getting all 
>>>>>>>> these
>>>>>>>> wired errors as soon as we bumped up our load test with 150+ consume 
>>>>>>>> kafka
>>>>>>>> processors.
>>>>>>>>
>>>>>>>> On Mon, Jun 11, 2018 at 10:59 AM Joe Witt <joe.w...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello
>>>>>>>>>
>>>>>>>>> Is this a single instance with wildcarding of topics?  Please
>>>>>>>>> share config details.
>>>>>>>>>
>>>>>>>>> If you want that in a single instance you may need to alter the
>>>>>>>>> timeout associated with any single kafka consumer.  The assignment 
>>>>>>>>> will be
>>>>>>>>> per topic per partion.  How many he threads for that processor?
>>>>>>>>>
>>>>>>>>> Finally, consider using ConsumeKafkaRecord and if you are using
>>>>>>>>> kaka 1 or newer use the latest processor.
>>>>>>>>>
>>>>>>>>> thanks
>>>>>>>>>
>>>>>>>>> On Sun, Jun 10, 2018, 9:21 PM Faisal Durrani <te04.0...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Does anyone know about this error from Kafka? I am using Nifi
>>>>>>>>>> 1.5.0 with ConsumerKafka processor.
>>>>>>>>>>
>>>>>>>>>> ConsumeKafka[id=34753ed3-9dd6-15ed-9c91-147026236eee] Failed to
>>>>>>>>>> retain connection due to No current assignment for partition
>>>>>>>>>> TEST_KAFKA_TOPIC:
>>>>>>>>>>
>>>>>>>>>> This is the first time we are testing Nifi to consume from over
>>>>>>>>>> 200 topics and its failing terribly so far. When this error goes the 
>>>>>>>>>> other
>>>>>>>>>> one comes up which is as below
>>>>>>>>>>
>>>>>>>>>> Was interrupted while trying to communicate with Kafka with lease
>>>>>>>>>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$
>>>>>>>>>> SimpleConsumerLease@6cb8afba. Will roll back session and discard
>>>>>>>>>> any partially received data.
>>>>>>>>>>
>>>>>>>>>
>>>>>>

Reply via email to