[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r976577008 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java: ## @@ -653,6 +636,131 @@ private void writeRecordData(final ProcessSession session, final List consumerRecord, final TopicPartition topicPartition, +final Record record, final Map attributes) throws SchemaNotFoundException, IOException { +RecordSetWriter writer = null; +// Determine the bundle for this record. +final RecordSchema recordSchema = record.getSchema(); +final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null); + +BundleTracker tracker = bundleMap.get(bundleInfo); +if (tracker == null) { +FlowFile flowFile = session.create(); +flowFile = session.putAllAttributes(flowFile, attributes); + +final OutputStream rawOut = session.write(flowFile); + +final RecordSchema writeSchema; +try { +writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); +} catch (final Exception e) { +logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + +rollback(topicPartition); +yield(); + +throw new ProcessException(e); +} + +writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile); +writer.beginRecordSet(); + +tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); +tracker.updateFlowFile(flowFile); +bundleMap.put(bundleInfo, tracker); +} else { +writer = tracker.recordWriter; +} + +try { +writer.write(record); +} catch (final RuntimeException re) { +handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " ++ "Will route message as its own FlowFile to the 'parse.failure' relationship"); +return writer; +} + +tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null)); +session.adjustCounter("Records Received", 1L, false); +return writer; +} + +private MapRecord toWrapperRecord(final ConsumerRecord consumerRecord, final Record record) +throws IOException, SchemaNotFoundException, MalformedRecordException { +final Tuple tupleKey = toWrapperRecordKey(consumerRecord); +final Tuple tupleValue = toWrapperRecordValue(record); +final Tuple tupleHeaders = toWrapperRecordHeaders(consumerRecord); +final Tuple tupleMetadata = toWrapperRecordMetadata(consumerRecord); +final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList( +tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey())); + +final Map recordValues = new HashMap<>(); +recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue()); +recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue()); +recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue()); +recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue()); +return new MapRecord(rootRecordSchema, recordValues); +} + +private Tuple toWrapperRecordKey(final ConsumerRecord consumerRecord) +throws IOException, SchemaNotFoundException, MalformedRecordException { +final Tuple tuple; +final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key(); +if (KafkaProcessorUtils.KEY_AS_RECORD.getValue().equals(keyFormat)) { +final Map attributes = getAttributes(consumerRecord); +try (final InputStream is = new ByteArrayInputStream(key)) { +try (final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger)) { +final Record record = reader.nextRecord(); +final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema())); +tuple = new Tuple<>(recordField, record); +} +} +} else if (KafkaProcessorUtils.KEY_AS_STRING.getValue().equals(keyFormat)) { +final RecordField recordField = new RecordField("key",
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r976575896 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java: ## @@ -591,51 +607,18 @@ private void writeRecordData(final ProcessSession session, final List
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r931354937 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java: ## @@ -239,6 +251,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia + "If not specified, no FlowFile attributes will be added as headers.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .expressionLanguageSupported(NONE) +.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE) Review Comment: I'm not sure that it's necessary here. In the case of the Consumer, we are passing it down the stack (or "up the stack?") so that the lease knows what to do with it. Here, it doesn't look like we're passing the Publish Strategy to any other classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r931325919 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java: ## @@ -593,6 +609,9 @@ private void writeRecordData(final ProcessSession session, final List
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r930324902 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html: ## @@ -256,5 +256,133 @@ SASL_SSL ssl.client.auth property. +Output Modes + +This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property 'Consume +Strategy') for converting Kafka records into FlowFiles. + +Consume Strategy 'Write Value Only' (the default) emits flowfile records containing only the Kafka +record value. + +Consume Strategy 'Use Wrapper' (new) emits flowfile records containing the Kafka record key, value, +and headers, as well as additional metadata from the Kafka record. + + + + +The record schema that is used when 'Use Wrapper' is active is as follows (in Avro format): + + +[ + { +"type": "record", +"name": "kafka:ConsumeRecord:metadata", +"namespace": "org.apache.nifi", +"fields": [{ + "name": "key", + "type": ["bytes", "string", "record"] +}, { + "name": "topic", + "type": "string" +}, { + "name": "partition", + "type": "int" +}, { + "name": "offset", + "type": "long" +}, { + "name": "timestamp", + "type": "long", + "logicalType": "timestamp-millis" +}] + }, + { +"type": "record", +"name": "kafka:ConsumeRecord:wrapper", +"namespace": "org.apache.nifi", +"fields": [{ + "name": "key", + "type": ["bytes", "string", "record"] +}, { + "name": "value", + "type": "record" +}, { + "name": "headers", + "type": "map", + "values": "string" +}, { + "name": "metadata", + "type": "kafka:ConsumeRecord:metadata" +}] + } +] Review Comment: I think I would document the schema as: ``` { "type": "record", "name": "nifiRecord", "namespace": "org.apache.nifi", "fields": [{ "name": "key", "type": [{ }, "null"] }, { "name": "value", "type": [ { }, "null" ] }, { "name": "headers", "type": [ { "type": "map", "values": "string", "default": {}}, "null"] }, { "name": "metadata", "type": [ { "type": "record", "name": "metadataType", "fields": [ { "name": "topic", "type": ["string", "null"] }, { "name": "partition", "type": ["int", "null"] }, { "name": "offset", "type": ["int", "null"] }, { "name": "timestamp", "type": ["long", "null"] } ] }, "null" ] } ] } ``` ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html: ## @@ -256,5 +256,133 @@ SASL_SSL ssl.client.auth property. +Output Modes + +This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property 'Consume Review Comment: Looks like this will likely not make it into 1.17, unfortunately. Will probably need to update that comment. Can just remove the version all together, as we generate documentation per version. ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java: ## @@ -271,7 +284,13 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia .defaultValue("UTF-8") .required(false) .build(); - +static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder() +.name("record-key-writer") +.displayName("Record Key Writer") Review Comment: Given that we're adding a Record Key Writer, we should probably change display name of "Record Writer" to "Record Value Writer" for clarity ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java: ## @@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia .defaultValue("UTF-8") .required(false) .build(); +static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder() +.name("output-strategy") +.displayName("Output Strategy") +
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r921594339 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java: ## @@ -193,10 +196,24 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet additionalAttributes = writeResult.getAttributes(); writer.flush(); } - final byte[] messageContent = baos.toByteArray(); -final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); -final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + +final byte[] messageKey; +if ((recordKeyWriterFactory == null) || (messageKeyField == null)) { +messageKey = Optional.ofNullable(record.getAsString(messageKeyField)) +.map(s -> s.getBytes(StandardCharsets.UTF_8)).orElse(null); +} else { +try (final ByteArrayOutputStream os = new ByteArrayOutputStream(1024)) { +final Record keyRecord = Optional.ofNullable(record.getValue(messageKeyField)) + .filter(Record.class::isInstance).map(Record.class::cast) +.orElseThrow(() -> new IOException("The property 'Record Key Writer' is defined, but the record key is not a record")); Review Comment: I think I would actually do something like: ``` final Object key; final Object keyValue = record.getValue(messageKeyField); if (keyValue == null) { key = keyValue; } else if (keyValue instanceof byte[]) { key = (byte[]) keyValue; } else if (keyValue instanceof Byte[]) { // This case exists because in our Record API we currently don't have a BYTES type, we use an Array of type Byte, which creates a Byte[] instead of a byte[]. We should address this in the future, but we should account for the log here. final Byte[] bytes = (Byte[]) keyValue; key = new byte[bytes.length]; for (int i=0; i < bytes.length; i++) { key[i] = bytes[i]; } } else if (key instanceof Record) { ... // what you have below, to create record writer, write key, flush } else { final String keyString = keyValue.toString(); key = keyString.getBytes(StandardCharsets.UTF_8); } ``` I.e., just look at the object that you have. Using the schema is also feasible. But it can get really complicated if the schema indicates that it's a CHOICE type because then you'll probably still end up having to do this type of logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r921422679 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java: ## @@ -271,7 +273,21 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia .defaultValue("UTF-8") .required(false) .build(); - +static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() +.name("publish-strategy") +.displayName("Publish Strategy") +.description("The format used to publish the outgoing FlowFile record to Kafka.") +.required(true) +.defaultValue(WRITE_VALUE_ONLY.getValue()) +.allowableValues(WRITE_VALUE_ONLY, USE_WRAPPER) Review Comment: Yes, sorry, I was suggesting renaming it. Perhaps something like "Use Content as Record Value" or "FLowFile Content as Record Value" or something like that? Was just trying to convey that we aren't *only* writing the record value, a key and headers may also be written. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] markap14 commented on a diff in pull request #6131: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
markap14 commented on code in PR #6131: URL: https://github.com/apache/nifi/pull/6131#discussion_r902981711 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java: ## @@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia .defaultValue("UTF-8") .required(false) .build(); +static final PropertyDescriptor CONSUME_STRATEGY = new PropertyDescriptor.Builder() +.name("consume-strategy") +.displayName("Consume Strategy") Review Comment: This doesn't really control how we consume data from Kafka. Rather, it controls how we write out the data. So perhaps "Output Strategy" is more suitable? ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java: ## @@ -271,7 +273,21 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia .defaultValue("UTF-8") .required(false) .build(); - +static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder() Review Comment: With the addition of this property, I think some of the existing properties can also be hidden if using the wrapper. Specifically, just like in the consumer, we should make MESSAGE_KEY_FIELD and ATTRIBUTE_NAME_REGEX dependent properties because it doesn't make sense to specify those if using the Wrapper. ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java: ## @@ -110,6 +110,12 @@ public final class KafkaProcessorUtils { "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " + "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task."); +static final AllowableValue WRITE_VALUE_ONLY = new AllowableValue("write-value-only", "Write Value Only", "Write only the Kafka Record value."); +static final AllowableValue USE_WRAPPER = new AllowableValue("use-wrapper", "Use Wrapper", "Write the Kafka Record key, value, headers, and metadata."); Review Comment: Perhaps we should mention in the description explicitly that these fields will be wrapped in a wrapper element, and to see Processor Usage for more information. ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html: ## @@ -256,5 +256,69 @@ SASL_SSL ssl.client.auth property. +Output Modes + +This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property 'Consume +Strategy') for converting Kafka records into FlowFiles. + +Consume Strategy 'Write Value Only' (the default) emits flowfile records containing only the Kafka +record value. + +Consume Strategy 'Use Wrapper' (new) emits flowfile records containing the Kafka record key, value, +and headers, as well as additional metadata from the Kafka record. + + + + +The record schema that is used when 'Use Wrapper' is active is as follows: + +key: one of RECORD, STRING, or BYTE_ARRAY (based on processor configuration property 'Key Format') + +value: RECORD +headers: MAP (STRING, STRING) +metadata: RECORD + +topic: STRING +partition: INTEGER +offset: LONG +timestamp: TIMESTAMP + + + + +If the Consume Strategy property 'Use Wrapper' is active, an additional processor configuration property +('Key Format') is activated. This property is used to fine-tune the transformation of the incoming Kafka +record. The possible values for 'Key Format' are 'Byte Array', 'String', or 'Record'. + +'Byte Array' supplies the Kafka Record Key bytes unchanged from the incoming Kafka record. +'String' converts the Kafka Record Key bytes into a string using the UTF-8 character encoding. +(Failure to parse the key bytes as UTF-8 will result in the record being routed to the +'parse.failure' relationship.) + +'Record' converts the Kafka