[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-08-23 Thread GitBox


markap14 commented on PR #6131:
URL: https://github.com/apache/nifi/pull/6131#issuecomment-1224840438

   @greyp9 So, given the above (i will address in reverse order because I think 
you started with the hardest to describe and got easier as you went down the 
list) :) ...
   
   The last point, about "Message Key Field" property I think is perfectly 
accurate.
   
   As for the headers:
   - I think what you said is accurate, but to clarify:
   - If (Use Wrapper) - the headers to send would be a single header. Its name 
would be "headerA" and its value would be "headerAValue". FlowFile attributes 
would not be sent as headers.
   - Else, the headers would be any FlowFile attribute that matches the 
"Attributes to Send as Headers (Regex)" property
   
   Now, as for the other...
   ```
   if (Use Wrapper) {
   Kafka Record:
   Key = { "type": "person" }
   Value = { "name": "Mark", "number": 49 }
   Headers = A single header with name "headerA", value "headerAValue"
   } else {
   Kafka Record:
   Key = 
   Value = 
   {
  "key": {
  "type": "person"
   },
  "value": {
   "name": "Mark",
   "number": 49
  },
  "headers": {
"headerA": "headerAValue"
  }
}
   Headers = 
   }
   ```
   
   So, in short, if Use Wrapper, the incoming FlowFile must have 3 fields:
   Key. This becomes the kafka message key.
   Value. This becomes the contents of the kafka message.
   Headers. This becomes the headers attached to the kafka message.
   Any other fields, such as metadata, would be ignored.
   
   If NOT using wrapper, it would function as it always has. The entire 
contents of the Record go as the kafka message payload. The key and headers are 
determined based on the configured "Message Key Field" and "Attributes to Send 
as Headers (Regex)" properties.
   
   Does that make sense? Or have I confused things even worse? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-08-22 Thread GitBox


markap14 commented on PR #6131:
URL: https://github.com/apache/nifi/pull/6131#issuecomment-1222917037

   I also tried changing the "Publish Strategy" to "Use Record Content as 
Value" so that I could set the "Key Field." I then set the Key Field to `key`. 
And then changed the Publish Strategy back to "Use Wrapper".
   This time, because the Key Field was specified, it sent the data. But what 
it sent was not correct.
   It sent the key correctly. But for the Kafka message, it sent the entire 
payload, not just the `value` section.
   And it sent no headers. So it appears to behave as if the Publish Strategy 
still was set to "Use Record Content as Value"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-08-22 Thread GitBox


markap14 commented on PR #6131:
URL: https://github.com/apache/nifi/pull/6131#issuecomment-1222894569

   Thanks for updating @greyp9 . Trying this out again.
   I just tried sending the following content via PublishKafkaRecord_2_6:
   ```
   {
 "key": {
   "type": "person"
 },
 "value": {
   "name": "Mark",
   "number": 49
 },
 "headers": {
"headerA": "headerAValue"
 }
   }
   ```
   I set Publish Strategy to "Use Wrapper" and used a JsonTreeReader as the 
record reader.
   But I encountered a NullPointerException:
   ```
   2022-08-22 15:58:22,792 ERROR [Timer-Driven Process Thread-2] 
o.a.n.p.k.pubsub.PublishKafkaRecord_2_6 
PublishKafkaRecord_2_6[id=c71d2b54-0182-1000-4979-53ab4cc8d555] Failed to send 
StandardFlowFileRecord[uuid=e8d03807-fa13-4620-be6a-72e014ac0838,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1661197835339-1, container=default, 
section=1], offset=3200, 
length=145],offset=0,name=e8d03807-fa13-4620-be6a-72e014ac0838,size=145] to 
Kafka
   java.lang.NullPointerException: null
at 
org.apache.nifi.processors.kafka.pubsub.PublisherLease.toWrapperRecord(PublisherLease.java:262)
at 
org.apache.nifi.processors.kafka.pubsub.PublisherLease.publish(PublisherLease.java:210)
at 
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6$1.process(PublishKafkaRecord_2_6.java:521)
at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2693)
at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2661)
at 
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6.onTrigger(PublishKafkaRecord_2_6.java:513)
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1357)
at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
at java.base/java.lang.Thread.run(Unknown Source)
   ```
   
   One other thing that I noticed, though it's minor: When I change the 
"Publish Strategy" to "Use Wrapper" I'm given the option to configure the 
"Record Key Writer" property. But this is way down the list, about 8 or 10 
properties down, so it's not at all obvious that it's made available. Probably 
want to move the "Record Key Writer" just after the "Publish Strategy" property 
sends it depends on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org