I had it running and was adding new content to the queue that other
consumers were able to see.

On Tue, Nov 13, 2018 at 1:15 PM Mark Payne <[email protected]> wrote:

> Mike,
>
> Is there new data coming into the Kafka topic? By default, when the
> Processor is started, it uses
> an auto commit offset of 'latest'. So that means that if you started the
> Processor with this setting,
> the commit offset is saved pointing to the end of the topic. So if no more
> data is coming into the topic,
> you'll not see anything out of the processor, while the ConsumeKafka
> processor was started with the
> offset at 'earliest'?
>
> Thanks
> -Mark
>
> On Nov 13, 2018, at 12:54 PM, Mike Thomsen <[email protected]> wrote:
>
> That would appear to be the case. So here's what I was doing:
>
> 1. Used this sort of code to serialize the Avro:
>
>     private byte[] serialize(Object obj, Class clz) throws Exception {
>         SpecificDatumWriter writer = new SpecificDatumWriter<>(clz);
>         ByteArrayOutputStream out = new ByteArrayOutputStream();
>         BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out,
> null);
>         //writer.write(obj, encoder);
>         //encoder.flush();
>         out.close();
>
>         return out.toByteArray();
>     }
>
> So that got me a byte array with just the binary representation.
>
> 2. Produced that to the Kafka topic.
>
> 3. Had an AvroReader variously configured to use schema.name hard set to
> an entry on the AvroSchemaRegistry or EmbeddedSchema.
>
> Didn't see any flowfiles get emitted to parse.failure or success.
>
> On Tue, Nov 13, 2018 at 12:50 PM Joe Witt <[email protected]> wrote:
>
>> Mike - so does this mean the parse.failure relationship wasn't working
>> though?  We should try to dig into this more if you're up for it or
>> sharing more details.  We dont want the behavior you ran into for
>> sure...
>> On Tue, Nov 13, 2018 at 12:49 PM Mike Thomsen <[email protected]>
>> wrote:
>> >
>> > So after a lot of diving into 1.9.X, it **appears** that there was some
>> sort of combination w/ the AvroReader + ConsumeKafkaRecord that was
>> suppressing errors from being reported. Haven't been able to fully figure
>> out what was doing on, but I know one of the contributing factors was that
>> my producer (spring boot service, not NiFi) was incorrectly building the
>> binary form of the Avro (didn't have embedded schema).
>> >
>> > Mike
>> >
>> > On Tue, Nov 13, 2018 at 12:33 PM Joe Witt <[email protected]> wrote:
>> >>
>> >> Viking
>> >>
>> >> Just to clarify it isn't that it is trying to merge records to create
>> >> larger flow files but rather it is 'playing the kafka interaction as
>> >> it lies' so to speak.  When polling from Kafka we can get one or more
>> >> records.  We're just taking advantage of that without trading off
>> >> speed.
>> >>
>> >> Thanks
>> >> On Tue, Nov 13, 2018 at 12:24 PM Viking K <[email protected]>
>> wrote:
>> >> >
>> >> > Mike, are we talking about complete avro messages or bare records
>> (schema is contained inside the avro file  or do they use a schema registry)
>> >> >
>> >> > From my own testing the ConsumeKafkaRecord tries to bundle in the
>> incoming messages to create larger flow files.
>> >> > Do you use any Kafka headers in the processor?
>> >> >
>> >> > Also what happens if you try to replicate the behavior of
>> ConsumeKafkaRecord like this. I don't know if you need the ConvertRecord
>> but it might be needed to pick out a schema name to use as Merge strategy.
>> >> > ConsumeKafka -> (ConvertRecord) -> Merge Content
>> >> >
>> >> > /Viking
>> >> > ________________________________
>> >> > From: Mike Thomsen <[email protected]>
>> >> > Sent: Tuesday, November 13, 2018 3:02 PM
>> >> > To: [email protected]
>> >> > Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka
>> >> >
>> >> > Closest thing I see to something that implies something might be
>> awry is this in nifi-app.log:
>> >> >
>> >> > javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=app-info,id=consumer-1
>> >> >     at
>> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> >> >     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> >> >     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> >> >     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> >> >     at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> >> >     at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> >> >     at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
>> >> >     at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
>> >> >     at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
>> >> >     at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
>> >> >     at
>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool.createKafkaConsumer(ConsumerPool.java:143)
>> >> >     at
>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool.obtainConsumer(ConsumerPool.java:107)
>> >> >     at
>> org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(ConsumeKafka.java:359)
>> >> >     at
>> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>> >> >     at
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
>> >> >     at
>> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
>> >> >     at
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>> >> >     at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> >     at
>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>> >> >     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> >> >     at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> >> >     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> >     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> >     at java.lang.Thread.run(Thread.java:748)
>> >> >
>> >> >
>> >> >
>> >> > On Tue, Nov 13, 2018 at 10:00 AM Pierre Villard <
>> [email protected]> wrote:
>> >> >
>> >> > Hey Mike,
>> >> >
>> >> > Anything in the logs?
>> >> >
>> >> > Pierre
>> >> >
>> >> > Le mar. 13 nov. 2018 à 15:56, Mike Thomsen <[email protected]>
>> a écrit :
>> >> >
>> >> > I have an odd situation where I have ConsumeKafkaRecord and
>> ConsumeKafka pulling from the same topic under different consumer groups,
>> but only the latter will pull new events. I ran into a situation where the
>> reader didn't like the Avro data being pulled from the queue and so I
>> created new topics and configured both processors to use the new ones.
>> However, only the non-record version of the processor will read.
>> >> >
>> >> > Anyone got suggestions on how to debug this? I'm reasonably familiar
>> with Kafka, but can't figure out why ConsumeKafka and the console consumer
>> can read the topic, but ConsumeKafkaRecord is acting like there's nothing
>> there at all.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Mike
>>
>
>

Reply via email to