This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 852715a NIFI-7409: Azure managed identity support to Azure Datalake processors 852715a is described below commit 852715aadd9e989d37aeadecfe92093e212a5ad1 Author: sjyang18 <ilson...@hotmail.com> AuthorDate: Fri May 1 18:55:50 2020 +0000 NIFI-7409: Azure managed identity support to Azure Datalake processors NIFI-7409: review changes NIFI-7409: ordering import statements NIFI-7409: changed validateCredentialProperties logic This closes #4249. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi-azure-processors/pom.xml | 16 +-- .../AbstractAzureDataLakeStorageProcessor.java | 111 +++++++++++++++------ ...eStorageCredentialsControllerServiceLookup.java | 3 +- .../storage/TestAbstractAzureDataLakeStorage.java | 19 +++- nifi-nar-bundles/nifi-azure-bundle/pom.xml | 42 ++++++++ 5 files changed, 147 insertions(+), 44 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 0a3602d..2cff8ee 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -57,6 +57,16 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-core</artifactId> + <version>1.5.0</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + <version>1.0.6</version> + </dependency> + <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-eventhubs</artifactId> <version>${azure-eventhubs.version}</version> @@ -75,12 +85,6 @@ <artifactId>azure-storage-file-datalake</artifactId> <version>12.1.1</version> </dependency> - <!-- overriding jackson-core in azure-storage --> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>2.10.3</version> - </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 40d276c..af75f99 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -16,30 +16,32 @@ */ package org.apache.nifi.processors.azure; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.azure.identity.ManagedIdentityCredential; +import com.azure.identity.ManagedIdentityCredentialBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.commons.lang3.StringUtils; - -import com.azure.storage.common.StorageSharedKeyCredential; -import com.azure.storage.file.datalake.DataLakeServiceClient; -import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; - -import java.util.Arrays; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.Map; public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { @@ -85,6 +87,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder() + .name("use-managed-identity") + .displayName("Use Azure Managed Identity") + .description("Choose whether or not to use the managed identity of Azure VM/VMSS ") + .required(false).defaultValue("false").allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build(); + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() .name("filesystem-name").displayName("Filesystem Name") .description("Name of the Azure Storage File System. It is assumed to be already existing.") @@ -110,6 +119,15 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .defaultValue("${azure.filename}") .build(); + public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder() + .name("endpoint-suffix").displayName("Endpoint Suffix") + .description("Endpoint Suffix") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(false) + .defaultValue("dfs.core.windows.net") + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( "Files that have been successfully written to Azure storage are transferred to this relationship") .build(); @@ -118,9 +136,14 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .build(); private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList( - Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, - AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM, - AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE)); + Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, + AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY, + AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, + AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY, + AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX, + AbstractAzureDataLakeStorageProcessor.FILESYSTEM, + AbstractAzureDataLakeStorageProcessor.DIRECTORY, + AbstractAzureDataLakeStorageProcessor.FILE)); private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet( new HashSet<>(Arrays.asList( @@ -134,17 +157,32 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public static Collection<ValidationResult> validateCredentialProperties(final ValidationContext validationContext) { final List<ValidationResult> results = new ArrayList<>(); - final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue(); - final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue(); - final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue(); - - if (StringUtils.isNotBlank(accountName) - && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) { - results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false) - .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() + - " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() + - " must be specified, not both") - .build()); + + final boolean useManagedIdentity = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean(); + final boolean accountKeyIsSet = validationContext.getProperty(ACCOUNT_KEY).isSet(); + final boolean sasTokenIsSet = validationContext.getProperty(SAS_TOKEN).isSet(); + + int credential_config_found = 0; + if(useManagedIdentity) credential_config_found++; + if(accountKeyIsSet) credential_config_found++; + if(sasTokenIsSet) credential_config_found++; + + if(credential_config_found == 0){ + final String msg = String.format( + "At least one of ['%s', '%s', '%s'] should be set", + ACCOUNT_KEY.getDisplayName(), + SAS_TOKEN.getDisplayName(), + USE_MANAGED_IDENTITY.getDisplayName() + ); + results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); + } else if(credential_config_found > 1) { + final String msg = String.format( + "Only one of ['%s', '%s', '%s'] should be set", + ACCOUNT_KEY.getDisplayName(), + SAS_TOKEN.getDisplayName(), + USE_MANAGED_IDENTITY.getDisplayName() + ); + results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } return results; } @@ -154,7 +192,9 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue(); final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue(); final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue(); - final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName); + final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue(); + final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix); + final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean(); DataLakeServiceClient storageClient; if (StringUtils.isNotBlank(accountKey)) { final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, @@ -164,6 +204,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc } else if (StringUtils.isNotBlank(sasToken)) { storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken) .buildClient(); + } else if(useManagedIdentity){ + final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder() + .build(); + storageClient = new DataLakeServiceClientBuilder() + .endpoint(endpoint) + .credential(misCrendential) + .buildClient(); } else { throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.", ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName())); @@ -181,4 +228,4 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc public Set<Relationship> getRelationships() { return RELATIONSHIPS; } -} \ No newline at end of file +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java index 4ac2f07..1cfe1a7 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java @@ -16,12 +16,13 @@ */ package org.apache.nifi.services.azure.storage; +import java.util.Map; + import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup; -import java.util.Map; @Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" }) @CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " + diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java index 59800bb..960b1fe 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java @@ -16,17 +16,18 @@ */ package org.apache.nifi.processors.azure.storage; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN; +import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; public class TestAbstractAzureDataLakeStorage { @@ -58,6 +59,14 @@ public class TestAbstractAzureDataLakeStorage { } @Test + public void testValidWhenAccountNameAndUseManagedIdentity() { + runner.removeProperty(ACCOUNT_KEY); + runner.setProperty(USE_MANAGED_IDENTITY, "true"); + + runner.assertValid(); + } + + @Test public void testNotValidWhenNoAccountNameSpecified() { runner.removeProperty(ACCOUNT_NAME); diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml index d4983e1..c5ecc49 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml @@ -27,6 +27,7 @@ <properties> <azure-storage.version>8.4.0</azure-storage.version> + <jackson.version>2.10.3</jackson.version> </properties> <modules> @@ -50,6 +51,47 @@ </exclusion> </exclusions> </dependency> + <!-- dependency convergency resolution --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-xml</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-jaxb-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jsr310</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.9</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-text</artifactId> + <version>1.8</version> + </dependency> </dependencies> </dependencyManagement> </project>