[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-09-21 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r976577008


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##
@@ -653,6 +636,131 @@ private void writeRecordData(final ProcessSession 
session, final List consumerRecord, final TopicPartition 
topicPartition,
+final Record record, final Map attributes) throws SchemaNotFoundException, IOException {
+RecordSetWriter writer = null;
+// Determine the bundle for this record.
+final RecordSchema recordSchema = record.getSchema();
+final BundleInformation bundleInfo = new 
BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? 
consumerRecord.key() : null);
+
+BundleTracker tracker = bundleMap.get(bundleInfo);
+if (tracker == null) {
+FlowFile flowFile = session.create();
+flowFile = session.putAllAttributes(flowFile, attributes);
+
+final OutputStream rawOut = session.write(flowFile);
+
+final RecordSchema writeSchema;
+try {
+writeSchema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+} catch (final Exception e) {
+logger.error("Failed to obtain Schema for FlowFile. Will 
roll back the Kafka message offsets.", e);
+
+rollback(topicPartition);
+yield();
+
+throw new ProcessException(e);
+}
+
+writer = writerFactory.createWriter(logger, writeSchema, 
rawOut, flowFile);
+writer.beginRecordSet();
+
+tracker = new BundleTracker(consumerRecord, topicPartition, 
keyEncoding, writer);
+tracker.updateFlowFile(flowFile);
+bundleMap.put(bundleInfo, tracker);
+} else {
+writer = tracker.recordWriter;
+}
+
+try {
+writer.write(record);
+} catch (final RuntimeException re) {
+handleParseFailure(consumerRecord, session, re, "Failed to 
write message from Kafka using the configured Record Writer. "
++ "Will route message as its own FlowFile to the 
'parse.failure' relationship");
+return writer;
+}
+
+tracker.incrementRecordCount(1L, consumerRecord.offset(), 
consumerRecord.leaderEpoch().orElse(null));
+session.adjustCounter("Records Received", 1L, false);
+return writer;
+}
+
+private MapRecord toWrapperRecord(final ConsumerRecord 
consumerRecord, final Record record)
+throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+final Tuple tupleKey  = 
toWrapperRecordKey(consumerRecord);
+final Tuple tupleValue  = 
toWrapperRecordValue(record);
+final Tuple tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+final Tuple tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+final Map recordValues = new HashMap<>();
+recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+return new MapRecord(rootRecordSchema, recordValues);
+}
+
+private Tuple toWrapperRecordKey(final 
ConsumerRecord consumerRecord)
+throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+final Tuple tuple;
+final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+if (KafkaProcessorUtils.KEY_AS_RECORD.getValue().equals(keyFormat)) {
+final Map attributes = 
getAttributes(consumerRecord);
+try (final InputStream is = new ByteArrayInputStream(key)) {
+try (final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger)) {
+final Record record = reader.nextRecord();
+final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+tuple = new Tuple<>(recordField, record);
+}
+}
+} else if 
(KafkaProcessorUtils.KEY_AS_STRING.getValue().equals(keyFormat)) {
+final RecordField recordField = new RecordField("key", 

[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-09-21 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r976575896


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##
@@ -591,51 +607,18 @@ private void writeRecordData(final ProcessSession 
session, final List

[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-07-27 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r931354937


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##
@@ -239,6 +251,7 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 + "If not specified, no FlowFile attributes will be added as 
headers.")
 .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
 .expressionLanguageSupported(NONE)
+.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)

Review Comment:
   I'm not sure that it's necessary here. In the case of the Consumer, we are 
passing it down the stack (or "up the stack?") so that the lease knows what to 
do with it. Here, it doesn't look like we're passing the Publish Strategy to 
any other classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-07-27 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r931325919


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##
@@ -593,6 +609,9 @@ private void writeRecordData(final ProcessSession session, 
final List

[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-07-26 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r930324902


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##
@@ -256,5 +256,133 @@ SASL_SSL
 ssl.client.auth property.
 
 
+Output Modes
+
+This processor (NiFi 1.17+) offers multiple output strategies 
(configured via processor property 'Consume
+Strategy') for converting Kafka records into FlowFiles.
+
+Consume Strategy 'Write Value Only' (the default) emits 
flowfile records containing only the Kafka
+record value.
+
+Consume Strategy 'Use Wrapper' (new) emits flowfile 
records containing the Kafka record key, value,
+and headers, as well as additional metadata from the Kafka 
record.
+
+
+
+
+The record schema that is used when 'Use Wrapper' is active is 
as follows (in Avro format):
+
+
+[
+  {
+"type": "record",
+"name": "kafka:ConsumeRecord:metadata",
+"namespace": "org.apache.nifi",
+"fields": [{
+  "name": "key",
+  "type": ["bytes", "string", "record"]
+}, {
+  "name": "topic",
+  "type": "string"
+}, {
+  "name": "partition",
+  "type": "int"
+}, {
+  "name": "offset",
+  "type": "long"
+}, {
+  "name": "timestamp",
+  "type": "long",
+  "logicalType": "timestamp-millis"
+}]
+  },
+  {
+"type": "record",
+"name": "kafka:ConsumeRecord:wrapper",
+"namespace": "org.apache.nifi",
+"fields": [{
+  "name": "key",
+  "type": ["bytes", "string", "record"]
+}, {
+  "name": "value",
+  "type": "record"
+}, {
+  "name": "headers",
+  "type": "map",
+  "values": "string"
+}, {
+  "name": "metadata",
+  "type": "kafka:ConsumeRecord:metadata"
+}]
+  }
+]

Review Comment:
   I think I would document the schema as:
   ```
   {
 "type": "record",
 "name": "nifiRecord",
 "namespace": "org.apache.nifi",
 "fields": [{
 "name": "key",
 "type": [{
 
   }, "null"]
   },
   {
 "name": "value",
 "type": [
   {
 
   },
   "null"
 ]
   },
   {
 "name": "headers",
 "type": [
   { "type": "map", "values": "string", "default": {}},
   "null"]
   },
   {
 "name": "metadata",
 "type": [
   {
 "type": "record",
 "name": "metadataType",
 "fields": [
   { "name": "topic", "type": ["string", "null"] },
   { "name": "partition", "type": ["int", "null"] },
   { "name": "offset", "type": ["int", "null"] },
   { "name": "timestamp", "type": ["long", "null"] }
 ]
   },
   "null"
 ]
   }
 ]
   }
   ```



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##
@@ -256,5 +256,133 @@ SASL_SSL
 ssl.client.auth property.
 
 
+Output Modes
+
+This processor (NiFi 1.17+) offers multiple output strategies 
(configured via processor property 'Consume

Review Comment:
   Looks like this will likely not make it into 1.17, unfortunately. Will 
probably need to update that comment. Can just remove the version all together, 
as we generate documentation per version.



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##
@@ -271,7 +284,13 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 .defaultValue("UTF-8")
 .required(false)
 .build();
-
+static final PropertyDescriptor RECORD_KEY_WRITER = new 
PropertyDescriptor.Builder()
+.name("record-key-writer")
+.displayName("Record Key Writer")

Review Comment:
   Given that we're adding a Record Key Writer, we should probably change 
display name of "Record Writer" to "Record Value Writer" for clarity



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##
@@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 .defaultValue("UTF-8")
 .required(false)
 .build();
+static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("output-strategy")
+.displayName("Output Strategy")
+

[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-07-14 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r921594339


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##
@@ -193,10 +196,24 @@ void publish(final FlowFile flowFile, final RecordSet 
recordSet, final RecordSet
 additionalAttributes = writeResult.getAttributes();
 writer.flush();
 }
-
 final byte[] messageContent = baos.toByteArray();
-final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
-final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+
+final byte[] messageKey;
+if ((recordKeyWriterFactory == null) || (messageKeyField == 
null)) {
+messageKey = 
Optional.ofNullable(record.getAsString(messageKeyField))
+.map(s -> 
s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+} else {
+try (final ByteArrayOutputStream os = new 
ByteArrayOutputStream(1024)) {
+final Record keyRecord = 
Optional.ofNullable(record.getValue(messageKeyField))
+
.filter(Record.class::isInstance).map(Record.class::cast)
+.orElseThrow(() ->  new IOException("The 
property 'Record Key Writer' is defined, but the record key is not a record"));

Review Comment:
   I think I would actually do something like:
   ```
   final Object key;
   final Object keyValue = record.getValue(messageKeyField);
   if (keyValue == null) {
 key = keyValue;
   } else if (keyValue instanceof byte[]) {
 key = (byte[]) keyValue;
   } else if (keyValue instanceof Byte[]) {
 // This case exists because in our Record API we currently don't have a 
BYTES type, we use an Array of type Byte, which creates a Byte[] instead of a 
byte[]. We should address this in the future, but we should account for the log 
here.
 final Byte[] bytes = (Byte[]) keyValue;
 key = new byte[bytes.length];
 for (int i=0; i < bytes.length; i++) {
   key[i] = bytes[i];
 }
   } else if (key instanceof Record) {
... // what you have below, to create record writer, write key, flush
   } else {
 final String keyString = keyValue.toString();
 key = keyString.getBytes(StandardCharsets.UTF_8);
   }
   ```
   I.e., just look at the object that you have. Using the schema is also 
feasible. But it can get really complicated if the schema indicates that it's a 
CHOICE type because then you'll probably still end up having to do this type of 
logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-07-14 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r921422679


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##
@@ -271,7 +273,21 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 .defaultValue("UTF-8")
 .required(false)
 .build();
-
+static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("publish-strategy")
+.displayName("Publish Strategy")
+.description("The format used to publish the outgoing FlowFile 
record to Kafka.")
+.required(true)
+.defaultValue(WRITE_VALUE_ONLY.getValue())
+.allowableValues(WRITE_VALUE_ONLY, USE_WRAPPER)

Review Comment:
   Yes, sorry, I was suggesting renaming it. Perhaps something like "Use 
Content as Record Value" or "FLowFile Content as Record Value" or something 
like that? Was just trying to convey that we aren't *only* writing the record 
value, a key and headers may also be written.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

2022-06-21 Thread GitBox


markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r902981711


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##
@@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 .defaultValue("UTF-8")
 .required(false)
 .build();
+static final PropertyDescriptor CONSUME_STRATEGY = new 
PropertyDescriptor.Builder()
+.name("consume-strategy")
+.displayName("Consume Strategy")

Review Comment:
   This doesn't really control how we consume data from Kafka. Rather, it 
controls how we write out the data. So perhaps "Output Strategy" is more 
suitable?



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##
@@ -271,7 +273,21 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
 .defaultValue("UTF-8")
 .required(false)
 .build();
-
+static final PropertyDescriptor PUBLISH_STRATEGY = new 
PropertyDescriptor.Builder()

Review Comment:
   With the addition of this property, I think some of the existing properties 
can also be hidden if using the wrapper. Specifically, just like in the 
consumer, we should make MESSAGE_KEY_FIELD and ATTRIBUTE_NAME_REGEX dependent 
properties because it doesn't make sense to specify those if using the Wrapper.



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java:
##
@@ -110,6 +110,12 @@ public final class KafkaProcessorUtils {
 "When unable to publish a FlowFile to Kafka, the FlowFile will be 
placed back on the top of its queue so that it will be the next FlowFile tried 
again. " +
 "For dataflows where ordering of FlowFiles is important, this 
strategy can be used along with ensuring that the each processor in the 
dataflow uses only a single Concurrent Task.");
 
+static final AllowableValue WRITE_VALUE_ONLY = new 
AllowableValue("write-value-only", "Write Value Only", "Write only the Kafka 
Record value.");
+static final AllowableValue USE_WRAPPER = new 
AllowableValue("use-wrapper", "Use Wrapper", "Write the Kafka Record key, 
value, headers, and metadata.");

Review Comment:
   Perhaps we should mention in the description explicitly that these fields 
will be wrapped in a wrapper element, and to see Processor Usage for more 
information.



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##
@@ -256,5 +256,69 @@ SASL_SSL
 ssl.client.auth property.
 
 
+Output Modes
+
+This processor (NiFi 1.17+) offers multiple output strategies 
(configured via processor property 'Consume
+Strategy') for converting Kafka records into FlowFiles.
+
+Consume Strategy 'Write Value Only' (the default) emits 
flowfile records containing only the Kafka
+record value.
+
+Consume Strategy 'Use Wrapper' (new) emits flowfile 
records containing the Kafka record key, value,
+and headers, as well as additional metadata from the Kafka 
record.
+
+
+
+
+The record schema that is used when 'Use Wrapper' is active is 
as follows:
+
+key: one of RECORD, STRING, or BYTE_ARRAY (based on 
processor configuration property 'Key Format')
+
+value: RECORD
+headers: MAP (STRING, STRING)
+metadata: RECORD
+
+topic: STRING
+partition: INTEGER
+offset: LONG
+timestamp: TIMESTAMP
+
+
+
+
+If the Consume Strategy property 'Use Wrapper' is active, an 
additional processor configuration property
+('Key Format') is activated. This property is used to 
fine-tune the transformation of the incoming Kafka
+record. The possible values for 'Key Format' are 'Byte Array', 
'String', or 'Record'.
+
+'Byte Array' supplies the Kafka Record Key bytes unchanged 
from the incoming Kafka record.
+'String' converts the Kafka Record Key bytes into a string 
using the UTF-8 character encoding.
+(Failure to parse the key bytes as UTF-8 will result in 
the record being routed to the
+'parse.failure' relationship.)
+
+'Record' converts the Kafka