Samra, Another thing that comes to mind when testing this is to ensure before each run you clear out the state/offset we're tracking for that client and change the 'Group ID'.
For each thread the processor has it will be assigned zero to n partitions. A thread could get zero partitions if there are more processor threads than there are partitions in kafka for instance (this would be pretty odd but just throwing that out there). Otherwise a nifi processor thread will have 1..n partitions it is responsible for. The client will ascertain its proper offset from kafka (latest meaning it will start at the end of that partition when the client is created). You might also which to set your 'offset reset' to 'earliest' to ensure there is no race condition between creating that consumer (and getting its offset) versus when the data is placed into the partition by whatever producer you have. Thanks Joe On Wed, Jan 25, 2017 at 3:09 PM, Joe Witt <[email protected]> wrote: > Thanks. Could you just let the data queue up in NiFi so you can see > the backlogged count? I'm concerned about overlapping filenames/IO > issues that could impact how much gets written to disk as well. > > Thanks > Joe > > On Wed, Jan 25, 2017 at 3:05 PM, Samra Kasim > <[email protected]> wrote: >> Sure. On the Configure Processors I have set: >> Properties: >> - Kafka Brokers to the 3 broker nodes on my cluster >> - Security Protocol: PLAINTEXT >> - Topic Name: testtopic >> - Group ID: testgroup >> - Offset Reset: latest >> - Key Attribute Encoding: UTF-8 Encoded >> - Max Poll Records: 10000 >> - Max Uncommited Time: 1 secs >> >> In scheduling, I have the Concurrent Tasks to be 1 for each processor. >> >> The input data is a json string. I have a test data set of 200,000 records. >> My flow goes from the 3 ConsumeKafka processors to a PutFile processor, >> which saves the flowfiles to a directory. I do a count on the directory and >> so am able to see if all the records made it. >> >> On Wed, Jan 25, 2017 at 2:57 PM, Joe Witt <[email protected]> wrote: >>> >>> Hello >>> >>> Can you share a bit more about the details of the ConsumeKafka >>> processor and its configuration. What are the settings you have? Can >>> you describe a bit more about the input data and how you're >>> determining there is loss? >>> >>> Thanks >>> Joe >>> >>> On Wed, Jan 25, 2017 at 2:47 PM, Samra Kasim >>> <[email protected]> wrote: >>> > Hi, >>> > >>> > I am new to NiFi and I am reading off a Kafka topic that has 3 >>> > partitions. >>> > In my Nifi flow, I have 3 ConsumeKafka processors with the same groupId >>> > and >>> > Topic. However, when I push large datasets (e.g., 200,000+), 300-400 >>> > records >>> > don't make it to the next processor. This only happens when I have the >>> > Concurrent Tasks in the Scheduling Tab set to more than 1 (e.g., 2 or >>> > 3). If >>> > I have the Concurrent Tasks set to 1 then all the records make it >>> > through to >>> > the next processor just fine. >>> > >>> > I may need to define kafka.partitions to have each Nifi processor point >>> > to a >>> > specific Kafka partition, but am not sure where/how to do that. I tried >>> > adding it to the properties, but that doesn't work. Has anyone else >>> > worked >>> > through this issue? >>> > >>> > I am using Nifi 1.1.1 and Kafka 0.9 >>> > >>> > -- >>> > >>> > >>> > Sam >>> > >>> > >> >> >> >> >> -- >> >> >> Samra Kasim >> >> Technologist >> HUMANgEO >> >> Virginia Office >> 4350 N Fairfax Drive >> Suite 950 >> Arlington, VA 22203 >> >> E-Mail: [email protected] >> Web: http://www.thehumangeo.com/
