I had it running and was adding new content to the queue that other consumers were able to see.
On Tue, Nov 13, 2018 at 1:15 PM Mark Payne <[email protected]> wrote: > Mike, > > Is there new data coming into the Kafka topic? By default, when the > Processor is started, it uses > an auto commit offset of 'latest'. So that means that if you started the > Processor with this setting, > the commit offset is saved pointing to the end of the topic. So if no more > data is coming into the topic, > you'll not see anything out of the processor, while the ConsumeKafka > processor was started with the > offset at 'earliest'? > > Thanks > -Mark > > On Nov 13, 2018, at 12:54 PM, Mike Thomsen <[email protected]> wrote: > > That would appear to be the case. So here's what I was doing: > > 1. Used this sort of code to serialize the Avro: > > private byte[] serialize(Object obj, Class clz) throws Exception { > SpecificDatumWriter writer = new SpecificDatumWriter<>(clz); > ByteArrayOutputStream out = new ByteArrayOutputStream(); > BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, > null); > //writer.write(obj, encoder); > //encoder.flush(); > out.close(); > > return out.toByteArray(); > } > > So that got me a byte array with just the binary representation. > > 2. Produced that to the Kafka topic. > > 3. Had an AvroReader variously configured to use schema.name hard set to > an entry on the AvroSchemaRegistry or EmbeddedSchema. > > Didn't see any flowfiles get emitted to parse.failure or success. > > On Tue, Nov 13, 2018 at 12:50 PM Joe Witt <[email protected]> wrote: > >> Mike - so does this mean the parse.failure relationship wasn't working >> though? We should try to dig into this more if you're up for it or >> sharing more details. We dont want the behavior you ran into for >> sure... >> On Tue, Nov 13, 2018 at 12:49 PM Mike Thomsen <[email protected]> >> wrote: >> > >> > So after a lot of diving into 1.9.X, it **appears** that there was some >> sort of combination w/ the AvroReader + ConsumeKafkaRecord that was >> suppressing errors from being reported. Haven't been able to fully figure >> out what was doing on, but I know one of the contributing factors was that >> my producer (spring boot service, not NiFi) was incorrectly building the >> binary form of the Avro (didn't have embedded schema). >> > >> > Mike >> > >> > On Tue, Nov 13, 2018 at 12:33 PM Joe Witt <[email protected]> wrote: >> >> >> >> Viking >> >> >> >> Just to clarify it isn't that it is trying to merge records to create >> >> larger flow files but rather it is 'playing the kafka interaction as >> >> it lies' so to speak. When polling from Kafka we can get one or more >> >> records. We're just taking advantage of that without trading off >> >> speed. >> >> >> >> Thanks >> >> On Tue, Nov 13, 2018 at 12:24 PM Viking K <[email protected]> >> wrote: >> >> > >> >> > Mike, are we talking about complete avro messages or bare records >> (schema is contained inside the avro file or do they use a schema registry) >> >> > >> >> > From my own testing the ConsumeKafkaRecord tries to bundle in the >> incoming messages to create larger flow files. >> >> > Do you use any Kafka headers in the processor? >> >> > >> >> > Also what happens if you try to replicate the behavior of >> ConsumeKafkaRecord like this. I don't know if you need the ConvertRecord >> but it might be needed to pick out a schema name to use as Merge strategy. >> >> > ConsumeKafka -> (ConvertRecord) -> Merge Content >> >> > >> >> > /Viking >> >> > ________________________________ >> >> > From: Mike Thomsen <[email protected]> >> >> > Sent: Tuesday, November 13, 2018 3:02 PM >> >> > To: [email protected] >> >> > Subject: Re: ConsumeKafkaRecord won't pull new events from Kafka >> >> > >> >> > Closest thing I see to something that implies something might be >> awry is this in nifi-app.log: >> >> > >> >> > javax.management.InstanceAlreadyExistsException: >> kafka.consumer:type=app-info,id=consumer-1 >> >> > at >> com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) >> >> > at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) >> >> > at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) >> >> > at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) >> >> > at >> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) >> >> > at >> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) >> >> > at >> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57) >> >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640) >> >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512) >> >> > at >> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494) >> >> > at >> org.apache.nifi.processors.kafka.pubsub.ConsumerPool.createKafkaConsumer(ConsumerPool.java:143) >> >> > at >> org.apache.nifi.processors.kafka.pubsub.ConsumerPool.obtainConsumer(ConsumerPool.java:107) >> >> > at >> org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.onTrigger(ConsumeKafka.java:359) >> >> > at >> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) >> >> > at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) >> >> > at >> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) >> >> > at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) >> >> > at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> >> > at >> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> >> > at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> >> > at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> >> > at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> > at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> > at java.lang.Thread.run(Thread.java:748) >> >> > >> >> > >> >> > >> >> > On Tue, Nov 13, 2018 at 10:00 AM Pierre Villard < >> [email protected]> wrote: >> >> > >> >> > Hey Mike, >> >> > >> >> > Anything in the logs? >> >> > >> >> > Pierre >> >> > >> >> > Le mar. 13 nov. 2018 à 15:56, Mike Thomsen <[email protected]> >> a écrit : >> >> > >> >> > I have an odd situation where I have ConsumeKafkaRecord and >> ConsumeKafka pulling from the same topic under different consumer groups, >> but only the latter will pull new events. I ran into a situation where the >> reader didn't like the Avro data being pulled from the queue and so I >> created new topics and configured both processors to use the new ones. >> However, only the non-record version of the processor will read. >> >> > >> >> > Anyone got suggestions on how to debug this? I'm reasonably familiar >> with Kafka, but can't figure out why ConsumeKafka and the console consumer >> can read the topic, but ConsumeKafkaRecord is acting like there's nothing >> there at all. >> >> > >> >> > Thanks, >> >> > >> >> > Mike >> > >
