http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index d02d011..905b7fc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -46,6 +46,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -88,7 +89,7 @@ public class ConsumeKafka extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() @@ -97,7 +98,7 @@ public class ConsumeKafka extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -124,7 +125,7 @@ public class ConsumeKafka extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index cbe2e24..77a7f21 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; @@ -74,7 +75,7 @@ final class KafkaProcessorUtils { .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("localhost:9092") .build(); static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() @@ -82,7 +83,7 @@ final class KafkaProcessorUtils { .displayName("Security Protocol") .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); @@ -94,7 +95,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 32d1ea7..0430f0c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -102,7 +103,7 @@ public class PublishKafka extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() @@ -110,7 +111,7 @@ public class PublishKafka extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -122,7 +123,7 @@ public class PublishKafka extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -132,7 +133,7 @@ public class PublishKafka extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -156,7 +157,7 @@ public class PublishKafka extends AbstractProcessor { + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() @@ -173,7 +174,7 @@ public class PublishKafka extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java index a64cb8e..efcc242 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java @@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -89,7 +90,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -106,7 +107,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -115,7 +116,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -125,7 +126,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -167,7 +168,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) @@ -191,7 +192,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + "the messages together efficiently.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java index fdc2cb3..ff6a250 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java @@ -44,6 +44,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -88,7 +89,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() @@ -106,7 +107,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() @@ -133,7 +134,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " @@ -150,7 +151,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor { + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + "the messages together efficiently.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); @@ -184,7 +185,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor { + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index d835607..5b65b0d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.kerberos.KerberosCredentialsService; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; @@ -76,7 +77,7 @@ final class KafkaProcessorUtils { .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") .required(true) .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("localhost:9092") .build(); static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() @@ -84,7 +85,7 @@ final class KafkaProcessorUtils { .displayName("Security Protocol") .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) .defaultValue(SEC_PLAINTEXT.getValue()) .build(); @@ -96,7 +97,7 @@ final class KafkaProcessorUtils { + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() .name("sasl.kerberos.principal") @@ -105,7 +106,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() .name("sasl.kerberos.keytab") @@ -114,7 +115,7 @@ final class KafkaProcessorUtils { + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl.context.service") http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java index e26d665..d0f368d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java @@ -46,6 +46,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -107,7 +108,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -115,7 +116,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -124,7 +125,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -133,7 +134,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .displayName("Message Key Field") .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(false) .build(); @@ -142,7 +143,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -154,7 +155,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -164,7 +165,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -204,7 +205,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + "If not specified, no FlowFile attributes will be added as headers.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() @@ -214,7 +215,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor { + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java index 48f7747..d7e910d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java @@ -47,6 +47,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; @@ -102,7 +103,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { .description("The name of the Kafka Topic to publish to.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() @@ -110,7 +111,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { .displayName("Delivery Guarantee") .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); @@ -122,7 +123,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue("5 sec") .build(); @@ -132,7 +133,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .defaultValue("5 secs") .build(); @@ -156,7 +157,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() @@ -173,7 +174,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { .displayName("Message Demarcator") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " @@ -206,7 +207,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + "If not specified, no FlowFile attributes will be added as headers.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) .build(); static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() @@ -216,7 +217,7 @@ public class PublishKafka_1_0 extends AbstractProcessor { + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java index 32b83b8..65dcd5f 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java @@ -35,6 +35,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.HadoopValidators; @@ -56,7 +57,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor { + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") .required(false) .addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); protected static final Validator RECOGNIZED_URI = new Validator() { http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java index a3fffc3..a3602e3 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import org.apache.avro.Schema; @@ -45,6 +46,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -61,7 +63,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import java.util.concurrent.atomic.AtomicLong; @Tags({ "avro", "convert", "kite" }) @CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions") @@ -155,7 +156,7 @@ public class ConvertAvroSchema extends AbstractKiteConvertProcessor { .name("Input Schema") .description("Avro Schema of Input Flowfiles. This can be a URI (dataset, view, or resource) or literal JSON schema.") .addValidator(SCHEMA_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); @@ -164,7 +165,7 @@ public class ConvertAvroSchema extends AbstractKiteConvertProcessor { .name("Output Schema") .description("Avro Schema of Output Flowfiles. This can be a URI (dataset, view, or resource) or literal JSON schema.") .addValidator(MAPPED_SCHEMA_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true).build(); @VisibleForTesting http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 9702916..68e2647 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -39,6 +39,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -102,7 +103,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("Record schema") .description("Outgoing Avro schema for each record created from a CSV row") .addValidator(SCHEMA_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); @@ -111,7 +112,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV charset") .description("Character set for CSV files") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(DEFAULTS.charset) .build(); @@ -120,7 +121,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV delimiter") .description("Delimiter character for CSV records") .addValidator(CHAR_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(DEFAULTS.delimiter) .build(); @@ -129,7 +130,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV quote character") .description("Quote character for CSV values") .addValidator(CHAR_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(DEFAULTS.quote) .build(); @@ -138,7 +139,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("CSV escape character") .description("Escape character for CSV values") .addValidator(CHAR_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(DEFAULTS.escape) .build(); @@ -147,7 +148,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("Use CSV header line") .description("Whether to use the first line as a header") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(String.valueOf(DEFAULTS.useHeader)) .build(); @@ -156,7 +157,7 @@ public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { .name("Lines to skip") .description("Number of lines to skip before reading header or data") .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 1127a2d..f93e522 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -32,6 +33,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -48,7 +50,6 @@ import org.kitesdk.data.spi.filesystem.JSONFileReader; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import java.util.concurrent.atomic.AtomicLong; @Tags({"kite", "json", "avro"}) @InputRequirement(Requirement.INPUT_REQUIRED) @@ -76,7 +77,7 @@ public class ConvertJSONToAvro extends AbstractKiteConvertProcessor { .name("Record schema") .description("Outgoing Avro schema for each record created from a JSON object") .addValidator(SCHEMA_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java index 0edbd2b..4344ce0 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java @@ -32,6 +32,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; @@ -154,7 +155,7 @@ public class InferAvroSchema " as the underlying data. Setting this property will cause the value of" + " \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" to be ignored instead using this value.") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue(null) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -170,14 +171,14 @@ public class InferAvroSchema " no data is skipped.") .required(true) .defaultValue("0") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); public static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() .name("CSV delimiter") .description("Delimiter character for CSV records") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(CHAR_VALIDATOR) .defaultValue(",") .build(); @@ -188,7 +189,7 @@ public class InferAvroSchema " in the CSV FlowFile content data.") .required(true) .defaultValue("\\") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -198,7 +199,7 @@ public class InferAvroSchema " character in the CSV FlowFile content data.") .required(true) .defaultValue("'") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -207,7 +208,7 @@ public class InferAvroSchema .description("Value to be placed in the Avro record schema \"name\" field. The value must adhere to the Avro naming " + "rules for fullname. If Expression Language is present then the evaluated value must adhere to the Avro naming rules.") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.createRegexMatchingValidator(AVRO_RECORD_NAME_PATTERN)) .build(); @@ -216,7 +217,7 @@ public class InferAvroSchema .description("Character encoding of CSV data.") .required(true) .defaultValue("UTF-8") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); @@ -236,7 +237,7 @@ public class InferAvroSchema " the appropriate type. However the default value of 10 is almost always enough.") .required(true) .defaultValue("10") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java index 1a39664..7730554 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java @@ -32,6 +32,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -76,7 +77,7 @@ public class StoreInKiteDataset extends AbstractKiteProcessor { .name("Target dataset URI") .description("URI that identifies a Kite dataset where data will be stored") .addValidator(RECOGNIZED_URI) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java index 359e817..68e7004 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -33,6 +33,7 @@ import org.apache.kudu.client.SessionConfiguration; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; @@ -61,7 +62,7 @@ public abstract class AbstractKudu extends AbstractProcessor { .description("List all kudu masters's ip with port (e.g. 7051), comma separated") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() @@ -69,7 +70,7 @@ public abstract class AbstractKudu extends AbstractProcessor { .description("The name of the Kudu Table to put data into") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() @@ -117,7 +118,7 @@ public abstract class AbstractKudu extends AbstractProcessor { .defaultValue("100") .required(true) .addValidator(StandardValidators.createLongValidator(2, 100000, true)) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); protected static final Relationship REL_SUCCESS = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java index 3477754..5f1ae88 100644 --- a/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java +++ b/nifi-nar-bundles/nifi-language-translation-bundle/nifi-yandex-processors/src/main/java/org/apache/nifi/processors/yandex/YandexTranslate.java @@ -30,6 +30,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -75,7 +76,7 @@ import java.util.Set; }) @DynamicProperty(name = "The name of an attribute to set that will contain the translated text of the value", value = "The value to translate", - supportsExpressionLanguage = true, + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description = "User-defined properties are used to translate arbitrary text based on attributes.") public class YandexTranslate extends AbstractProcessor { @@ -90,7 +91,7 @@ public class YandexTranslate extends AbstractProcessor { .description("The language of incoming data") .required(true) .defaultValue("es") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(new LanguageNameValidator()) .build(); public static final PropertyDescriptor TARGET_LANGUAGE = new PropertyDescriptor.Builder() @@ -98,7 +99,7 @@ public class YandexTranslate extends AbstractProcessor { .description("The language to translate the text into") .required(true) .defaultValue("en") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(new LanguageNameValidator()) .build(); public static final PropertyDescriptor TRANSLATE_CONTENT = new PropertyDescriptor.Builder() @@ -113,7 +114,7 @@ public class YandexTranslate extends AbstractProcessor { .description("Specifies the character set of the data to be translated") .required(true) .defaultValue("UTF-8") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); @@ -169,7 +170,7 @@ public class YandexTranslate extends AbstractProcessor { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .dynamic(true) .build(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java b/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java index 176561f..40c034b 100644 --- a/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java +++ b/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/image/ResizeImage.java @@ -43,6 +43,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -70,14 +71,14 @@ public class ResizeImage extends AbstractProcessor { .name("Image Width (in pixels)") .description("The desired number of pixels for the image's width") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor IMAGE_HEIGHT = new PropertyDescriptor.Builder() .name("Image Height (in pixels)") .description("The desired number of pixels for the image's height") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor SCALING_ALGORITHM = new PropertyDescriptor.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java b/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java index 388bc73..c89853b 100644 --- a/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java +++ b/nifi-nar-bundles/nifi-media-bundle/nifi-media-processors/src/main/java/org/apache/nifi/processors/media/ExtractMediaMetadata.java @@ -37,6 +37,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; @@ -106,7 +107,7 @@ public class ExtractMediaMetadata extends AbstractProcessor { + " added by the processor.") .required(false) .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static final Relationship SUCCESS = new Relationship.Builder() http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java index 55623ce..32c6449 100644 --- a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java @@ -28,6 +28,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.metrics.reporting.task.MetricsReportingTask; import org.apache.nifi.processor.util.StandardValidators; @@ -57,7 +58,7 @@ public class GraphiteMetricReporterService extends AbstractControllerService imp .description("The hostname of the carbon listener") .required(true) .addValidator(StandardValidators.URI_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); /** @@ -69,7 +70,7 @@ public class GraphiteMetricReporterService extends AbstractControllerService imp .description("The port on which carbon listens") .required(true) .addValidator(StandardValidators.PORT_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); /** @@ -94,7 +95,7 @@ public class GraphiteMetricReporterService extends AbstractControllerService imp .required(true) .defaultValue("nifi") .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); /** http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java index 37eb194..dfd6f79 100644 --- a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java @@ -25,6 +25,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.metrics.FlowMetricSet; import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService; import org.apache.nifi.processor.util.StandardValidators; @@ -73,7 +74,7 @@ public class MetricsReportingTask extends AbstractReportingTask { .description("The id of the process group to report. If not specified, metrics of the root process group" + "are reported.") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index bd2c0e3..5bef4a8 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -30,6 +30,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -61,7 +62,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .displayName("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() @@ -69,14 +70,14 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .displayName("Mongo Database Name") .description("The name of the database to use") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() @@ -130,7 +131,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .name("mongo-query-attribute") .displayName("Query Output Attribute") .description("If set, the query will be written to a specified attribute on the output flowfiles.") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) .required(false) .build(); @@ -141,7 +142,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(true) .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); static List<PropertyDescriptor> descriptors = new ArrayList<>(); @@ -264,7 +265,8 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { return writeConcern; } - protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, Map extraAttributes, Relationship rel) throws UnsupportedEncodingException { + protected void writeBatch(String payload, FlowFile parent, ProcessContext context, ProcessSession session, + Map<String, String> extraAttributes, Relationship rel) throws UnsupportedEncodingException { String charset = parent != null ? context.getProperty(CHARSET).evaluateAttributeExpressions(parent).getValue() : context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 7f79e9d..fbd50b6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -31,6 +31,7 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -95,7 +96,7 @@ public class GetMongo extends AbstractMongoProcessor { "the flowfile's body. If this field is left blank and a timer is enabled instead of an incoming connection, " + "that will result in a full collection fetch using a \"{}\" query.") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(DOCUMENT_VALIDATOR) .build(); @@ -103,21 +104,21 @@ public class GetMongo extends AbstractMongoProcessor { .name("Projection") .description("The fields to be returned from the documents in the result set; must be a valid BSON document") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor SORT = new PropertyDescriptor.Builder() .name("Sort") .description("The fields by which to sort; must be a valid BSON document") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(DOCUMENT_VALIDATOR) .build(); static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder() .name("Limit") .description("The maximum number of elements to return") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -125,7 +126,7 @@ public class GetMongo extends AbstractMongoProcessor { .name("Batch Size") .description("The number of elements returned from the server in one batch") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); static final PropertyDescriptor RESULTS_PER_FLOWFILE = new PropertyDescriptor.Builder() @@ -133,7 +134,7 @@ public class GetMongo extends AbstractMongoProcessor { .displayName("Results Per FlowFile") .description("How many results to put into a flowfile at once. The whole body will be treated as a JSON array of results.") .required(false) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -164,7 +165,7 @@ public class GetMongo extends AbstractMongoProcessor { .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .build(); @@ -253,7 +254,7 @@ public class GetMongo extends AbstractMongoProcessor { final ComponentLog logger = getLogger(); - Map attributes = new HashMap(); + Map<String, String> attributes = new HashMap<String, String>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); final Document query; http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index d5ec667..668d023 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -30,11 +30,13 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.bson.Document; import org.bson.conversions.Bson; import com.fasterxml.jackson.databind.ObjectMapper; @@ -74,7 +76,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { ObjectMapper mapper = new ObjectMapper(); List<Map> values = mapper.readValue(query, List.class); - for (Map val : values) { + for (Map<?, ?> val : values) { result.add(new BasicDBObject(val)); } @@ -102,7 +104,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() .name("mongo-agg-query") .displayName("Query") - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .description("The aggregation query to be executed.") .required(true) .addValidator(AGG_VALIDATOR) @@ -137,7 +139,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { return propertyDescriptors; } - static String buildBatch(List batch) { + static String buildBatch(List<Document> batch) { ObjectMapper mapper = new ObjectMapper(); String retVal; try { @@ -165,27 +167,27 @@ public class RunMongoAggregation extends AbstractMongoProcessor { Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); - Map attrs = new HashMap(); + Map<String, String> attrs = new HashMap<String, String>(); if (queryAttr != null && queryAttr.trim().length() > 0) { attrs.put(queryAttr, query); } - MongoCollection collection = getCollection(context); - MongoCursor iter = null; + MongoCollection<Document> collection = getCollection(context); + MongoCursor<Document> iter = null; try { List<Bson> aggQuery = buildAggregationQuery(query); - AggregateIterable it = collection.aggregate(aggQuery); + AggregateIterable<Document> it = collection.aggregate(aggQuery); it.batchSize(batchSize != null ? batchSize : 1); iter = it.iterator(); - List batch = new ArrayList(); + List<Document> batch = new ArrayList<Document>(); while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); - batch = new ArrayList(); + batch = new ArrayList<Document>(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java index a079a2b..71bfc2b 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/GetMongoIT.java @@ -359,7 +359,6 @@ public class GetMongoIT { runner.setIncomingConnection(true); runner.setProperty(GetMongo.QUERY, query); runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "10"); - runner.setValidateExpressionUsage(true); runner.enqueue("test", attributes); runner.run(1, true, true); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index d6c489a..f2ddbca 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -59,7 +59,6 @@ public class RunMongoAggregationIT { runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR); - runner.setValidateExpressionUsage(true); mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java index 8ac05b2..5b6c97e 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/AbstractMongoDBControllerService.java @@ -30,6 +30,7 @@ import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.util.SslContextFactory; @@ -54,7 +55,7 @@ public class AbstractMongoDBControllerService extends AbstractControllerService .displayName("Mongo URI") .description("MongoURI, typically of the form: mongodb://host1[:port1][,host2[:port2],...]") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(Validation.DOCUMENT_VALIDATOR) .build(); protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() @@ -62,7 +63,7 @@ public class AbstractMongoDBControllerService extends AbstractControllerService .displayName("Mongo Database Name") .description("The name of the database to use") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() @@ -70,7 +71,7 @@ public class AbstractMongoDBControllerService extends AbstractControllerService .displayName("Mongo Collection Name") .description("The name of the collection to use") .required(true) - .expressionLanguageSupported(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()