[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on PR #6131: URL: https://github.com/apache/nifi/pull/6131#issuecomment-1224840438 @greyp9 So, given the above (i will address in reverse order because I think you started with the hardest to describe and got easier as you went down the list) :) ... The last point, about "Message Key Field" property I think is perfectly accurate. As for the headers: - I think what you said is accurate, but to clarify: - If (Use Wrapper) - the headers to send would be a single header. Its name would be "headerA" and its value would be "headerAValue". FlowFile attributes would not be sent as headers. - Else, the headers would be any FlowFile attribute that matches the "Attributes to Send as Headers (Regex)" property Now, as for the other... ``` if (Use Wrapper) { Kafka Record: Key = { "type": "person" } Value = { "name": "Mark", "number": 49 } Headers = A single header with name "headerA", value "headerAValue" } else { Kafka Record: Key = Value = { "key": { "type": "person" }, "value": { "name": "Mark", "number": 49 }, "headers": { "headerA": "headerAValue" } } Headers = } ``` So, in short, if Use Wrapper, the incoming FlowFile must have 3 fields: Key. This becomes the kafka message key. Value. This becomes the contents of the kafka message. Headers. This becomes the headers attached to the kafka message. Any other fields, such as metadata, would be ignored. If NOT using wrapper, it would function as it always has. The entire contents of the Record go as the kafka message payload. The key and headers are determined based on the configured "Message Key Field" and "Attributes to Send as Headers (Regex)" properties. Does that make sense? Or have I confused things even worse? :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on PR #6131: URL: https://github.com/apache/nifi/pull/6131#issuecomment-1222917037 I also tried changing the "Publish Strategy" to "Use Record Content as Value" so that I could set the "Key Field." I then set the Key Field to `key`. And then changed the Publish Strategy back to "Use Wrapper". This time, because the Key Field was specified, it sent the data. But what it sent was not correct. It sent the key correctly. But for the Kafka message, it sent the entire payload, not just the `value` section. And it sent no headers. So it appears to behave as if the Publish Strategy still was set to "Use Record Content as Value" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on PR #6131: URL: https://github.com/apache/nifi/pull/6131#issuecomment-1222894569 Thanks for updating @greyp9 . Trying this out again. I just tried sending the following content via PublishKafkaRecord_2_6: ``` { "key": { "type": "person" }, "value": { "name": "Mark", "number": 49 }, "headers": { "headerA": "headerAValue" } } ``` I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the record reader. But I encountered a NullPointerException: ``` 2022-08-22 15:58:22,792 ERROR [Timer-Driven Process Thread-2] o.a.n.p.k.pubsub.PublishKafkaRecord_2_6 PublishKafkaRecord_2_6[id=c71d2b54-0182-1000-4979-53ab4cc8d555] Failed to send StandardFlowFileRecord[uuid=e8d03807-fa13-4620-be6a-72e014ac0838,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1661197835339-1, container=default, section=1], offset=3200, length=145],offset=0,name=e8d03807-fa13-4620-be6a-72e014ac0838,size=145] to Kafka java.lang.NullPointerException: null at org.apache.nifi.processors.kafka.pubsub.PublisherLease.toWrapperRecord(PublisherLease.java:262) at org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:210) at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6$1.process(PublishKafkaRecord_2_6.java:521) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2693) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2661) at org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6.onTrigger(PublishKafkaRecord_2_6.java:513) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ``` One other thing that I noticed, though it's minor: When I change the "Publish Strategy" to "Use Wrapper" I'm given the option to configure the "Record Key Writer" property. But this is way down the list, about 8 or 10 properties down, so it's not at all obvious that it's made available. Probably want to move the "Record Key Writer" just after the "Publish Strategy" property sends it depends on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org