Matt, there is nothing more in the log about the error. I tried also hardcoding the schema name. And I also tried providing the schema as 'Schema Text' without the AvroSchemaRegistry. But I always get the same error. Maybe the problem is somewhere else. Maybe the schema itself or linebreaks in the schema or something missing in the schema? Thanks Matt, Uwe
Gesendet: Donnerstag, 22. Juni 2017 um 21:54 Uhr Von: "Matt Burgess" <[email protected]> An: [email protected] Betreff: Re: Nifi 1.3.0 - Problems with ConsumeKafkaRecord_0_10 Uwe, It looks like this error is directly related to your other question. This line from the stack trace: Caused by: java.lang.NullPointerException: null at org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458) Is when it calls record.getSchema(). Not sure if the "real" error message is getting lost or perhaps further down in your stack trace, but it looks like an error with finding or reading the schema. Regards, Matt On Thu, Jun 22, 2017 at 3:30 PM, Uwe Geercken <[email protected]> wrote: > Hello everyone, > > I wanted to try the following > - get messages from a kafka topic. these are simple messages in CSV format > - use the PartitionRecord processor to get familiar with the RecordPath > concept > > I started zookeeper and kafka on localhost and added some messages to a topic > using the kafka concole producer. A message looks like this: > > ZRH,departure,LX,1000,F,2017-06-22,10:00,2017-06-22,10:05,200 > > I can retrieve this message using the kafka concole consumer. > > To my flow I added the ConsumeKafkaRecord_0_10 and the PartitionRecord > processor. I configured the ConsumeKafkaRecord_0_10 with a CSVReader > controller. It uses the AvroSchemaRegistry 1.3.0 with following schema: > > { > "type": "record", > "name": "flight_schema", > "fields": [ > { "name": "flight_station", "type": "string" }, > { "name": "flight_orientation", "type": "string" }, > { "name": "flight_carrier", "type": "string" }, > { "name": "flight_number", "type": "string" }, > { "name": "flight_number_suffix", "type": "string" }, > { "name": "flight_scheduled_date", "type": "string" }, > { "name": "flight_scheduled_time", "type": "string" }, > { "name": "flight_actual_date", "type": "string" }, > { "name": "flight_actual_time", "type": "string" }, > { "name": "flight_passengers", "type": "int" } > ] > } > > And then I have a CSVRecordWriter which uses the same schema. I also added a > line to logback.xml to debug the ConsumeKafkaRecord_0_10 processor. > > Now when I run the processors and add a message to the topic in kafka I get > following error: > > > 2017-06-22 18:32:11,228 ERROR [Timer-Driven Process Thread-7] > o.a.n.p.k.pubsub.ConsumeKafkaRecord_0_10 > ConsumeKafkaRecord_0_10[id=cb353b32-015c-1000-0ed2-0753cceaa542] Exception > while processing data from kafka so will close the lease > org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@4f137cc2 > due to org.apache.nifi.processor.exception.ProcessException: > java.lang.NullPointerException: > org.apache.nifi.processor.exception.ProcessException: > java.lang.NullPointerException > org.apache.nifi.processor.exception.ProcessException: > java.lang.NullPointerException > at > org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:514) > at > org.apache.nifi.processors.kafka.pubsub.ConsumerLease.lambda$processRecords$8(ConsumerLease.java:320) > at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548) > at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) > at > org.apache.nifi.processors.kafka.pubsub.ConsumerLease.processRecords(ConsumerLease.java:307) > at > org.apache.nifi.processors.kafka.pubsub.ConsumerLease.poll(ConsumerLease.java:168) > at > org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.onTrigger(ConsumeKafkaRecord_0_10.java:327) > at > org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1120) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147) > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException: null > at > org.apache.nifi.processors.kafka.pubsub.ConsumerLease.writeRecordData(ConsumerLease.java:458) > ... 18 common frames omitted > > > > I have played around with the settings of the processors and controllers > quite a lot, but always get this NullPointerException. > > I then added a ConsumeKafka_0_10 1.3.0 processor to verify that I can > retrieve the messages and it does work. > > I hope that someone can point out what the problem is and help me. > > Greetings, > > Uwe
