This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 7cd1a30948 NIFI-13105 Implemented FlowRegistryClient using GitHub for versioning 7cd1a30948 is described below commit 7cd1a30948638c1fec113a47c835cb991a3a44fb Author: Bryan Bende <bbe...@apache.org> AuthorDate: Tue Jan 30 17:29:31 2024 -0500 NIFI-13105 Implemented FlowRegistryClient using GitHub for versioning This closes #8765 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../nifi/registry/flow/FlowRegistryClient.java | 14 +- nifi-assembly/pom.xml | 6 + .../nifi-github-extensions/pom.xml | 43 ++ .../apache/nifi/github/FlowSnapshotSerializer.java | 34 ++ .../nifi/github/GitHubAuthenticationType.java | 29 + .../nifi/github/GitHubCreateContentRequest.java | 101 +++ .../nifi/github/GitHubFlowRegistryClient.java | 677 +++++++++++++++++++++ .../apache/nifi/github/GitHubRepositoryClient.java | 471 ++++++++++++++ .../nifi/github/JacksonFlowSnapshotSerializer.java | 58 ++ .../nifi/github/VersionedComponentModule.java | 64 ++ ...rg.apache.nifi.registry.flow.FlowRegistryClient | 15 + .../nifi/github/GitHubFlowRegistryClientTest.java | 202 ++++++ .../nifi-github-bundle/nifi-github-nar/pom.xml | 42 ++ nifi-extension-bundles/nifi-github-bundle/pom.xml | 36 ++ nifi-extension-bundles/pom.xml | 1 + .../flow/FlowAnalyzingRegistryClientNode.java | 5 + .../flow/StandardFlowRegistryClientNode.java | 7 +- .../nifi/registry/flow/FlowRegistryClientNode.java | 2 + .../apache/nifi/web/StandardNiFiServiceFacade.java | 10 +- .../org/apache/nifi/web/api/VersionsResource.java | 2 +- .../nifi/web/dao/impl/StandardFlowRegistryDAO.java | 7 +- 21 files changed, 1817 insertions(+), 9 deletions(-) diff --git a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java index 749e5b880f..4436c55ef7 100644 --- a/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java +++ b/nifi-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.ConfigurableComponent; import java.io.IOException; import java.util.Optional; import java.util.Set; +import java.util.UUID; /** * <p> @@ -146,7 +147,7 @@ public interface FlowRegistryClient extends ConfigurableComponent { FlowRegistryBucket getBucket(FlowRegistryClientConfigurationContext context, BucketLocation bucketLocation) throws FlowRegistryException, IOException; /** - * Registers the given RegisteredFlow into the the Flow Registry. + * Registers the given RegisteredFlow into the Flow Registry. * * @param context Configuration context. * @param flow The RegisteredFlow to add to the Registry. @@ -252,4 +253,15 @@ public interface FlowRegistryClient extends ConfigurableComponent { * @throws IOException If there is issue with the communication between NiFi and the Flow Registry. */ Optional<String> getLatestVersion(FlowRegistryClientConfigurationContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException; + + /** + * Generates the id for registering a flow. + * + * @param flowName the name of the flow + * @return the generated id + */ + default String generateFlowId(final String flowName) { + return UUID.randomUUID().toString(); + } + } diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 00ee910d70..6501ad0e66 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -872,6 +872,12 @@ language governing permissions and limitations under the License. --> <version>2.0.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-github-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) --> <dependency> <groupId>org.aspectj</groupId> diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml new file mode 100644 index 0000000000..be73f95343 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-github-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-github-extensions</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-jakarta-xmlbind-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.kohsuke</groupId> + <artifactId>github-api</artifactId> + <version>${github-api.version}</version> + </dependency> + </dependencies> +</project> diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java new file mode 100644 index 0000000000..9f464e1d88 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/FlowSnapshotSerializer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.github; + +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Serializer for flow snapshots. + */ +public interface FlowSnapshotSerializer { + + String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException; + + RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException; + +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java new file mode 100644 index 0000000000..964c376e8d --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubAuthenticationType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.github; + +/** + * Enumeration of authentication types for the GitHub client. + */ +public enum GitHubAuthenticationType { + + NONE, + PERSONAL_ACCESS_TOKEN, + APP_INSTALLATION_TOKEN; + +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java new file mode 100644 index 0000000000..f1864eba41 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubCreateContentRequest.java @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.nifi.github; + +import java.util.Objects; + +public class GitHubCreateContentRequest { + + private final String branch; + private final String path; + private final String content; + private final String message; + private final String existingContentSha; + + private GitHubCreateContentRequest(final Builder builder) { + this.branch = Objects.requireNonNull(builder.branch); + this.path = Objects.requireNonNull(builder.path); + this.content = Objects.requireNonNull(builder.content); + this.message = Objects.requireNonNull(builder.message); + // Will be null for create, and populated for update + this.existingContentSha = builder.existingContentSha; + } + + public String getBranch() { + return branch; + } + + public String getPath() { + return path; + } + + public String getContent() { + return content; + } + + public String getMessage() { + return message; + } + + public String getExistingContentSha() { + return existingContentSha; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private String branch; + private String path; + private String content; + private String message; + private String existingContentSha; + + public Builder branch(final String branch) { + this.branch = branch; + return this; + } + + public Builder path(final String path) { + this.path = path; + return this; + } + + public Builder content(final String content) { + this.content = content; + return this; + } + + public Builder message(final String message) { + this.message = message; + return this; + } + + public Builder existingContentSha(final String existingSha) { + this.existingContentSha = existingSha; + return this; + } + + public GitHubCreateContentRequest build() { + return new GitHubCreateContentRequest(this); + } + } +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java new file mode 100644 index 0000000000..34fce1d4cb --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubFlowRegistryClient.java @@ -0,0 +1,677 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.nifi.github; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flow.ConnectableComponent; +import org.apache.nifi.flow.Position; +import org.apache.nifi.flow.VersionedComponent; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flow.VersionedFlowCoordinates; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.registry.flow.AbstractFlowRegistryClient; +import org.apache.nifi.registry.flow.AuthorizationException; +import org.apache.nifi.registry.flow.BucketLocation; +import org.apache.nifi.registry.flow.FlowAlreadyExistsException; +import org.apache.nifi.registry.flow.FlowLocation; +import org.apache.nifi.registry.flow.FlowRegistryBranch; +import org.apache.nifi.registry.flow.FlowRegistryBucket; +import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; +import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext; +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.FlowRegistryPermissions; +import org.apache.nifi.registry.flow.FlowVersionLocation; +import org.apache.nifi.registry.flow.RegisterAction; +import org.apache.nifi.registry.flow.RegisteredFlow; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.kohsuke.github.GHCommit; +import org.kohsuke.github.GHContent; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * Implementation of {@link org.apache.nifi.registry.flow.FlowRegistryClient} that uses GitHub for version controlling flows. + */ +public class GitHubFlowRegistryClient extends AbstractFlowRegistryClient { + + static final PropertyDescriptor GITHUB_API_URL = new PropertyDescriptor.Builder() + .name("GitHub API URL") + .description("The URL of the GitHub API") + .addValidator(StandardValidators.URL_VALIDATOR) + .defaultValue("https://api.github.com/") + .required(true) + .build(); + + static final PropertyDescriptor REPOSITORY_NAME = new PropertyDescriptor.Builder() + .name("Repository Name") + .description("The name of the repository") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor REPOSITORY_OWNER = new PropertyDescriptor.Builder() + .name("Repository Owner") + .description("The owner of the repository") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .build(); + + static final PropertyDescriptor REPOSITORY_BRANCH = new PropertyDescriptor.Builder() + .name("Default Branch") + .description("The default branch to use for this client") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue("main") + .required(true) + .build(); + + static final PropertyDescriptor REPOSITORY_PATH = new PropertyDescriptor.Builder() + .name("Repository Path") + .description("The path with in the repository that this client will use to store all data. " + + "If left blank, then the root of the repository will be used.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(false) + .build(); + + static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder() + .name("Authentication Type") + .description("The type of authentication to use for accessing GitHub") + .allowableValues(GitHubAuthenticationType.values()) + .defaultValue(GitHubAuthenticationType.NONE.name()) + .required(true) + .build(); + + static final PropertyDescriptor PERSONAL_ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Personal Access Token") + .description("The personal access token to use for authentication") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .sensitive(true) + .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.PERSONAL_ACCESS_TOKEN.name()) + .build(); + + static final PropertyDescriptor APP_INSTALLATION_TOKEN = new PropertyDescriptor.Builder() + .name("App Installation Token") + .description("The app installation token to use for authentication") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(true) + .sensitive(true) + .dependsOn(AUTHENTICATION_TYPE, GitHubAuthenticationType.APP_INSTALLATION_TOKEN.name()) + .build(); + + static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of( + GITHUB_API_URL, + REPOSITORY_OWNER, + REPOSITORY_NAME, + REPOSITORY_BRANCH, + REPOSITORY_PATH, + AUTHENTICATION_TYPE, + PERSONAL_ACCESS_TOKEN, + APP_INSTALLATION_TOKEN + ); + + static final String DEFAULT_BUCKET_NAME = "default"; + static final String DEFAULT_BUCKET_KEEP_FILE_PATH = DEFAULT_BUCKET_NAME + "/.keep"; + static final String DEFAULT_BUCKET_KEEP_FILE_CONTENT = "Do Not Delete"; + static final String DEFAULT_BUCKET_KEEP_FILE_MESSAGE = "Creating default bucket"; + + static final String REGISTER_FLOW_MESSAGE_PREFIX = "Registering Flow"; + static final String REGISTER_FLOW_MESSAGE_FORMAT = REGISTER_FLOW_MESSAGE_PREFIX + " [%s]"; + static final String DEREGISTER_FLOW_MESSAGE_FORMAT = "Deregistering Flow [%s]"; + static final String DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT = "Saving Flow Snapshot %s"; + static final String SNAPSHOT_FILE_EXTENSION = ".json"; + static final String SNAPSHOT_FILE_PATH_FORMAT = "%s/%s" + SNAPSHOT_FILE_EXTENSION; + static final String FLOW_CONTENTS_GROUP_ID = "flow-contents-group"; + + static final String STORAGE_LOCATION_PREFIX = "g...@github.com:"; + static final String STORAGE_LOCATION_FORMAT = STORAGE_LOCATION_PREFIX + "%s/%s.git"; + + private volatile FlowSnapshotSerializer flowSnapshotSerializer; + private volatile GitHubRepositoryClient repositoryClient; + private final AtomicBoolean clientInitialized = new AtomicBoolean(false); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext)); + + final String repoPath = validationContext.getProperty(REPOSITORY_PATH).getValue(); + if (repoPath != null && (repoPath.startsWith("/") || repoPath.endsWith("/"))) { + results.add(new ValidationResult.Builder() + .subject(REPOSITORY_PATH.getDisplayName()) + .valid(false) + .explanation("Path can not start or end with /") + .build()); + } + + return results; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + synchronized (this) { + clientInitialized.set(false); + } + } + + @Override + public void initialize(final FlowRegistryClientInitializationContext context) { + super.initialize(context); + flowSnapshotSerializer = createFlowSnapshotSerializer(context); + } + + // protected to allow for overriding from tests + protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) { + return new JacksonFlowSnapshotSerializer(); + } + + @Override + public boolean isStorageLocationApplicable(final FlowRegistryClientConfigurationContext context, final String location) { + return location != null && location.startsWith(STORAGE_LOCATION_PREFIX); + } + + @Override + public boolean isBranchingSupported(final FlowRegistryClientConfigurationContext context) { + return true; + } + + @Override + public Set<FlowRegistryBranch> getBranches(final FlowRegistryClientConfigurationContext context) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + return repositoryClient.getBranches().stream() + .map(branchName -> { + final FlowRegistryBranch flowRegistryBranch = new FlowRegistryBranch(); + flowRegistryBranch.setName(branchName); + return flowRegistryBranch; + }).collect(Collectors.toSet()); + } + + @Override + public FlowRegistryBranch getDefaultBranch(final FlowRegistryClientConfigurationContext context) { + final FlowRegistryBranch defaultBranch = new FlowRegistryBranch(); + defaultBranch.setName(context.getProperty(REPOSITORY_BRANCH).getValue()); + return defaultBranch; + } + + @Override + public Set<FlowRegistryBucket> getBuckets(final FlowRegistryClientConfigurationContext context, final String branch) throws IOException, FlowRegistryException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final Set<FlowRegistryBucket> buckets = repositoryClient.getTopLevelDirectoryNames(branch).stream() + .map(bucketName -> createFlowRegistryBucket(repositoryClient, bucketName)) + .collect(Collectors.toSet()); + + // if the repository has no top-level directories, then return a default bucket entry, this won't exist in the repository until the first time a flow is saved to it + return buckets.isEmpty() ? Set.of(createFlowRegistryBucket(repositoryClient, DEFAULT_BUCKET_NAME)) : buckets; + } + + @Override + public FlowRegistryBucket getBucket(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + return createFlowRegistryBucket(repositoryClient, bucketLocation.getBucketId()); + } + + @Override + public RegisteredFlow registerFlow(final FlowRegistryClientConfigurationContext context, final RegisteredFlow flow) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyWritePermissions(repositoryClient); + + final String branch = flow.getBranch(); + final FlowLocation flowLocation = new FlowLocation(branch, flow.getBucketIdentifier(), flow.getIdentifier()); + final String filePath = getSnapshotFilePath(flowLocation); + final String commitMessage = REGISTER_FLOW_MESSAGE_FORMAT.formatted(flow.getIdentifier()); + + final Optional<String> existingFileSha = repositoryClient.getContentSha(filePath, branch); + if (existingFileSha.isPresent()) { + throw new FlowAlreadyExistsException("Another flow is already registered at [" + filePath + "] on branch [" + branch + "]"); + } + + // Clear values we don't want in the json stored in GitHub + final String originalBucketId = flow.getBucketIdentifier(); + flow.setBucketIdentifier(null); + flow.setBucketName(null); + flow.setBranch(null); + + final RegisteredFlowSnapshot flowSnapshot = new RegisteredFlowSnapshot(); + flowSnapshot.setFlow(flow); + + final GitHubCreateContentRequest request = GitHubCreateContentRequest.builder() + .branch(branch) + .path(filePath) + .content(flowSnapshotSerializer.serialize(flowSnapshot)) + .message(commitMessage) + .build(); + + repositoryClient.createContent(request); + + // Re-populate fields before returning + flow.setBucketName(originalBucketId); + flow.setBucketIdentifier(originalBucketId); + flow.setBranch(branch); + + return flow; + } + + @Override + public RegisteredFlow deregisterFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyWritePermissions(repositoryClient); + + final String branch = flowLocation.getBranch(); + final String filePath = getSnapshotFilePath(flowLocation); + final String commitMessage = DEREGISTER_FLOW_MESSAGE_FORMAT.formatted(flowLocation.getFlowId()); + final GHContent deletedSnapshotContent = repositoryClient.deleteContent(filePath, commitMessage, branch); + + final RegisteredFlowSnapshot deletedSnapshot = getSnapshot(deletedSnapshotContent.read()); + updateBucketReferences(repositoryClient, deletedSnapshot, flowLocation.getBucketId()); + return deletedSnapshot.getFlow(); + } + + @Override + public RegisteredFlow getFlow(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final String branch = flowLocation.getBranch(); + final String filePath = getSnapshotFilePath(flowLocation); + + final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch); + populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation); + updateBucketReferences(repositoryClient, existingSnapshot, flowLocation.getBucketId()); + + final RegisteredFlow registeredFlow = existingSnapshot.getFlow(); + registeredFlow.setBranch(branch); + return registeredFlow; + } + + @Override + public Set<RegisteredFlow> getFlows(final FlowRegistryClientConfigurationContext context, final BucketLocation bucketLocation) throws IOException, FlowRegistryException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final String branch = bucketLocation.getBranch(); + final String bucketId = bucketLocation.getBucketId(); + + return repositoryClient.getFileNames(bucketId, branch).stream() + .filter(filename -> filename.endsWith(SNAPSHOT_FILE_EXTENSION)) + .map(filename -> mapToRegisteredFlow(bucketLocation, filename)) + .collect(Collectors.toSet()); + } + + @Override + public RegisteredFlowSnapshot getFlowContents(final FlowRegistryClientConfigurationContext context, final FlowVersionLocation flowVersionLocation) + throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final String version = flowVersionLocation.getVersion(); + final String filePath = getSnapshotFilePath(flowVersionLocation); + + final InputStream inputStream = repositoryClient.getContentFromCommit(filePath, version); + final RegisteredFlowSnapshot flowSnapshot = getSnapshot(inputStream); + populateFlowAndSnapshotMetadata(flowSnapshot, flowVersionLocation); + + // populate values that aren't store in GitHub + flowSnapshot.getSnapshotMetadata().setVersion(version); + flowSnapshot.getSnapshotMetadata().setBranch(flowVersionLocation.getBranch()); + flowSnapshot.getFlow().setBranch(flowVersionLocation.getBranch()); + + // populate outgoing bucket references + updateBucketReferences(repositoryClient, flowSnapshot, flowVersionLocation.getBucketId()); + + // determine if the version is the "latest" version by comparing to the response of getLatestVersion + final String latestVersion = getLatestVersion(context, flowVersionLocation).orElse(null); + flowSnapshot.setLatest(version.equals(latestVersion)); + + return flowSnapshot; + } + + @Override + public RegisteredFlowSnapshot registerFlowSnapshot(final FlowRegistryClientConfigurationContext context, final RegisteredFlowSnapshot flowSnapshot, final RegisterAction action) + throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyWritePermissions(repositoryClient); + + final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + final String branch = snapshotMetadata.getBranch(); + final FlowLocation flowLocation = new FlowLocation(snapshotMetadata.getBranch(), snapshotMetadata.getBucketIdentifier(), snapshotMetadata.getFlowIdentifier()); + final String filePath = getSnapshotFilePath(flowLocation); + final String previousSha = repositoryClient.getContentSha(filePath, branch).orElse(null); + + final String snapshotComments = snapshotMetadata.getComments(); + final String commitMessage = StringUtils.isBlank(snapshotComments) ? DEFAULT_FLOW_SNAPSHOT_MESSAGE_FORMAT.formatted(flowLocation.getFlowId()) : snapshotComments; + + final RegisteredFlowSnapshot existingSnapshot = getSnapshot(filePath, branch); + populateFlowAndSnapshotMetadata(existingSnapshot, flowLocation); + + final RegisteredFlow existingFlow = existingSnapshot.getFlow(); + existingFlow.setBranch(null); + flowSnapshot.setFlow(existingFlow); + + // Clear values we don't want stored in the json in GitHub + flowSnapshot.setBucket(null); + flowSnapshot.getSnapshotMetadata().setBucketIdentifier(null); + flowSnapshot.getSnapshotMetadata().setBranch(null); + flowSnapshot.getSnapshotMetadata().setVersion(null); + flowSnapshot.getSnapshotMetadata().setComments(null); + flowSnapshot.getSnapshotMetadata().setTimestamp(0); + + // replace the id of the top level group and all of its references with a constant value prior to serializing to avoid + // unnecessary diffs when different instances of the same flow are imported and have different top-level PG ids + final String originalFlowContentsGroupId = replaceGroupId(flowSnapshot.getFlowContents(), FLOW_CONTENTS_GROUP_ID); + final Position originalFlowContentsPosition = replacePosition(flowSnapshot.getFlowContents(), new Position(0, 0)); + + final GitHubCreateContentRequest createContentRequest = GitHubCreateContentRequest.builder() + .branch(branch) + .path(filePath) + .content(flowSnapshotSerializer.serialize(flowSnapshot)) + .message(commitMessage) + .existingContentSha(previousSha) + .build(); + + final String createContentCommitSha = repositoryClient.createContent(createContentRequest); + + final VersionedFlowCoordinates versionedFlowCoordinates = new VersionedFlowCoordinates(); + versionedFlowCoordinates.setRegistryId(getIdentifier()); + versionedFlowCoordinates.setBranch(flowLocation.getBranch()); + versionedFlowCoordinates.setBucketId(flowLocation.getBucketId()); + versionedFlowCoordinates.setFlowId(flowLocation.getFlowId()); + versionedFlowCoordinates.setVersion(createContentCommitSha); + versionedFlowCoordinates.setStorageLocation(getStorageLocation(repositoryClient)); + + flowSnapshot.getFlowContents().setVersionedFlowCoordinates(versionedFlowCoordinates); + flowSnapshot.getFlow().setBranch(branch); + flowSnapshot.getSnapshotMetadata().setBranch(branch); + flowSnapshot.getSnapshotMetadata().setVersion(createContentCommitSha); + flowSnapshot.setLatest(true); + + // populate outgoing bucket references + updateBucketReferences(repositoryClient, flowSnapshot, flowLocation.getBucketId()); + + // set back to the original id so that the returned snapshot is has the correct values from what was passed in + replaceGroupId(flowSnapshot.getFlowContents(), originalFlowContentsGroupId); + replacePosition(flowSnapshot.getFlowContents(), originalFlowContentsPosition); + + return flowSnapshot; + } + + @Override + public Set<RegisteredFlowSnapshotMetadata> getFlowVersions(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) + throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final String branch = flowLocation.getBranch(); + final String filePath = getSnapshotFilePath(flowLocation); + + final Set<RegisteredFlowSnapshotMetadata> snapshotMetadataSet = new LinkedHashSet<>(); + for (final GHCommit ghCommit : repositoryClient.getCommits(filePath, branch)) { + final RegisteredFlowSnapshotMetadata snapshotMetadata = createSnapshotMetadata(ghCommit, flowLocation); + if (snapshotMetadata.getComments() != null && snapshotMetadata.getComments().startsWith(REGISTER_FLOW_MESSAGE_PREFIX)) { + continue; + } + snapshotMetadataSet.add(snapshotMetadata); + } + return snapshotMetadataSet; + } + + @Override + public Optional<String> getLatestVersion(final FlowRegistryClientConfigurationContext context, final FlowLocation flowLocation) throws FlowRegistryException, IOException { + final GitHubRepositoryClient repositoryClient = getRepositoryClient(context); + verifyReadPermissions(repositoryClient); + + final String branch = flowLocation.getBranch(); + final String filePath = getSnapshotFilePath(flowLocation); + + final List<GHCommit> commits = repositoryClient.getCommits(filePath, branch); + final String latestVersion = commits.isEmpty() ? null : commits.getFirst().getSHA1(); + return Optional.ofNullable(latestVersion); + } + + @Override + public String generateFlowId(final String flowName) { + return flowName.trim() + .replaceAll("\\s", "-") // replace whitespace with - + .replaceAll("[^a-zA-Z0-9-]", "") // replace all other invalid chars with empty string + .replaceAll("(-)\\1+", "$1"); // replace consecutive - with single - + } + + private FlowRegistryBucket createFlowRegistryBucket(final GitHubRepositoryClient repositoryClient, final String name) { + final FlowRegistryPermissions bucketPermissions = new FlowRegistryPermissions(); + bucketPermissions.setCanRead(repositoryClient.getCanRead()); + bucketPermissions.setCanWrite(repositoryClient.getCanWrite()); + bucketPermissions.setCanDelete(repositoryClient.getCanWrite()); + + final FlowRegistryBucket bucket = new FlowRegistryBucket(); + bucket.setIdentifier(name); + bucket.setName(name); + bucket.setPermissions(bucketPermissions); + return bucket; + } + + private RegisteredFlowSnapshotMetadata createSnapshotMetadata(final GHCommit ghCommit, final FlowLocation flowLocation) throws IOException { + final GHCommit.ShortInfo shortInfo = ghCommit.getCommitShortInfo(); + + final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata(); + snapshotMetadata.setBranch(flowLocation.getBranch()); + snapshotMetadata.setBucketIdentifier(flowLocation.getBucketId()); + snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId()); + snapshotMetadata.setVersion(ghCommit.getSHA1()); + snapshotMetadata.setAuthor(ghCommit.getAuthor().getLogin()); + snapshotMetadata.setComments(shortInfo.getMessage()); + snapshotMetadata.setTimestamp(shortInfo.getCommitDate().getTime()); + return snapshotMetadata; + } + + private RegisteredFlow mapToRegisteredFlow(final BucketLocation bucketLocation, final String filename) { + final String branch = bucketLocation.getBranch(); + final String bucketId = bucketLocation.getBucketId(); + final String flowId = filename.replace(SNAPSHOT_FILE_EXTENSION, ""); + + final RegisteredFlow registeredFlow = new RegisteredFlow(); + registeredFlow.setIdentifier(flowId); + registeredFlow.setName(flowId); + registeredFlow.setBranch(branch); + registeredFlow.setBucketIdentifier(bucketId); + registeredFlow.setBucketName(bucketId); + return registeredFlow; + } + + private String getSnapshotFilePath(final FlowLocation flowLocation) { + return SNAPSHOT_FILE_PATH_FORMAT.formatted(flowLocation.getBucketId(), flowLocation.getFlowId()); + } + + private RegisteredFlowSnapshot getSnapshot(final String filePath, final String branch) throws IOException, FlowRegistryException { + try (final InputStream contentInputStream = repositoryClient.getContentFromBranch(filePath, branch)) { + return flowSnapshotSerializer.deserialize(contentInputStream); + } + } + + private RegisteredFlowSnapshot getSnapshot(final InputStream inputStream) throws IOException { + try { + return flowSnapshotSerializer.deserialize(inputStream); + } finally { + IOUtils.closeQuietly(inputStream); + } + } + + private Position replacePosition(final VersionedProcessGroup group, final Position newPosition) { + final Position originalPosition = group.getPosition(); + group.setPosition(newPosition); + return originalPosition; + } + + private String replaceGroupId(final VersionedProcessGroup group, final String newGroupId) { + final String originalGroupId = group.getIdentifier(); + group.setIdentifier(newGroupId); + + replaceGroupId(group.getProcessGroups(), newGroupId); + replaceGroupId(group.getRemoteProcessGroups(), newGroupId); + replaceGroupId(group.getProcessors(), newGroupId); + replaceGroupId(group.getFunnels(), newGroupId); + replaceGroupId(group.getLabels(), newGroupId); + replaceGroupId(group.getInputPorts(), newGroupId); + replaceGroupId(group.getOutputPorts(), newGroupId); + replaceGroupId(group.getControllerServices(), newGroupId); + replaceGroupId(group.getConnections(), newGroupId); + + if (group.getConnections() != null) { + for (final VersionedConnection connection : group.getConnections()) { + replaceGroupId(connection.getSource(), originalGroupId, newGroupId); + replaceGroupId(connection.getDestination(), originalGroupId, newGroupId); + } + } + + return originalGroupId; + } + + private <T extends VersionedComponent> void replaceGroupId(final Collection<T> components, final String newGroupIdentifier) { + if (components == null) { + return; + } + components.forEach(c -> c.setGroupIdentifier(newGroupIdentifier)); + } + + private void replaceGroupId(final ConnectableComponent connectableComponent, final String originalGroupId, final String newGroupId) { + if (connectableComponent == null) { + return; + } + + if (originalGroupId.equals(connectableComponent.getGroupId())) { + connectableComponent.setGroupId(newGroupId); + } + } + + private void updateBucketReferences(final GitHubRepositoryClient repositoryClient, final RegisteredFlowSnapshot flowSnapshot, final String bucketId) { + final FlowRegistryBucket bucket = createFlowRegistryBucket(repositoryClient, bucketId); + flowSnapshot.setBucket(bucket); + + final RegisteredFlow flow = flowSnapshot.getFlow(); + flow.setBucketName(bucketId); + flow.setBucketIdentifier(bucketId); + + final RegisteredFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(bucketId); + } + + // Ensures the snapshot has non-null flow and metadata fields, which would only be null if taking a flow from "Download Flow Definition" and adding directly to GitHub + private void populateFlowAndSnapshotMetadata(final RegisteredFlowSnapshot flowSnapshot, final FlowLocation flowLocation) { + if (flowSnapshot.getFlow() == null) { + final RegisteredFlow registeredFlow = new RegisteredFlow(); + registeredFlow.setName(flowLocation.getFlowId()); + registeredFlow.setIdentifier(flowLocation.getFlowId()); + flowSnapshot.setFlow(registeredFlow); + } + if (flowSnapshot.getSnapshotMetadata() == null) { + final RegisteredFlowSnapshotMetadata snapshotMetadata = new RegisteredFlowSnapshotMetadata(); + snapshotMetadata.setFlowIdentifier(flowLocation.getFlowId()); + flowSnapshot.setSnapshotMetadata(snapshotMetadata); + } + } + + private String getStorageLocation(final GitHubRepositoryClient repositoryClient) { + return STORAGE_LOCATION_FORMAT.formatted(repositoryClient.getRepoOwner(), repositoryClient.getRepoName()); + } + + private void verifyWritePermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException { + if (!repositoryClient.getCanWrite()) { + throw new AuthorizationException("Client does not have write access to the GitHub repository"); + } + } + + private void verifyReadPermissions(final GitHubRepositoryClient repositoryClient) throws AuthorizationException { + if (!repositoryClient.getCanRead()) { + throw new AuthorizationException("Client does not have read access to the GitHub repository"); + } + } + + private synchronized GitHubRepositoryClient getRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException { + if (!clientInitialized.get()) { + getLogger().info("Initializing GitHub repository client"); + repositoryClient = createRepositoryClient(context); + clientInitialized.set(true); + initializeDefaultBucket(context); + } + return repositoryClient; + } + + // protected so can be overridden during tests + protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException { + return GitHubRepositoryClient.builder() + .apiUrl(context.getProperty(GITHUB_API_URL).getValue()) + .authenticationType(GitHubAuthenticationType.valueOf(context.getProperty(AUTHENTICATION_TYPE).getValue())) + .personalAccessToken(context.getProperty(PERSONAL_ACCESS_TOKEN).getValue()) + .appInstallationToken(context.getProperty(APP_INSTALLATION_TOKEN).getValue()) + .repoOwner(context.getProperty(REPOSITORY_OWNER).getValue()) + .repoName(context.getProperty(REPOSITORY_NAME).getValue()) + .repoPath(context.getProperty(REPOSITORY_PATH).getValue()) + .build(); + } + + // If the client has write permissions to the repo, then ensure the directory for the default bucket is present and if not create it, + // otherwise the client can only be used to import flows from the repo and won't be able to set up the default bucket + private void initializeDefaultBucket(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException { + if (!repositoryClient.getCanWrite()) { + getLogger().info("GitHub repository client does not have write permissions to the repository, skipping setup of default bucket"); + return; + } + + final String branch = context.getProperty(REPOSITORY_BRANCH).getValue(); + final Set<String> bucketDirectoryNames = repositoryClient.getTopLevelDirectoryNames(branch); + if (!bucketDirectoryNames.isEmpty()) { + getLogger().info("Found existing buckets, skipping setup of default bucket"); + return; + } + + getLogger().info("Creating default bucket in repo [{}/{}] on branch [{}]", repositoryClient.getRepoOwner(), repositoryClient.getRepoName(), branch); + + repositoryClient.createContent( + GitHubCreateContentRequest.builder() + .branch(branch) + .path(DEFAULT_BUCKET_KEEP_FILE_PATH) + .content(DEFAULT_BUCKET_KEEP_FILE_CONTENT) + .message(DEFAULT_BUCKET_KEEP_FILE_MESSAGE) + .build() + ); + } + +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java new file mode 100644 index 0000000000..fc85db0fc8 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/GitHubRepositoryClient.java @@ -0,0 +1,471 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.nifi.github; + +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.kohsuke.github.GHCommit; +import org.kohsuke.github.GHContent; +import org.kohsuke.github.GHContentUpdateResponse; +import org.kohsuke.github.GHMyself; +import org.kohsuke.github.GHPermissionType; +import org.kohsuke.github.GHRef; +import org.kohsuke.github.GHRepository; +import org.kohsuke.github.GitHub; +import org.kohsuke.github.GitHubBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * Client to encapsulate access to a GitHub Repository through the Hub4j GitHub client. + */ +public class GitHubRepositoryClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(GitHubRepositoryClient.class); + + private static final String BRANCH_REF_PATTERN = "refs/heads/%s"; + private static final int COMMIT_PAGE_SIZE = 50; + + private final String repoOwner; + private final String repoName; + private final String repoPath; + + private final GitHub gitHub; + private final GHRepository repository; + private final GitHubAuthenticationType authenticationType; + private final boolean canRead; + private final boolean canWrite; + + private GitHubRepositoryClient(final Builder builder) throws IOException, FlowRegistryException { + final String apiUrl = Objects.requireNonNull(builder.apiUrl, "API URL is required"); + final GitHubBuilder gitHubBuilder = new GitHubBuilder().withEndpoint(apiUrl); + + repoPath = builder.repoPath; + repoOwner = Objects.requireNonNull(builder.repoOwner, "Repository Owner is required"); + repoName = Objects.requireNonNull(builder.repoName, "Repository Name is required"); + authenticationType = Objects.requireNonNull(builder.authenticationType, "Authentication Type is required"); + + switch (authenticationType) { + case PERSONAL_ACCESS_TOKEN -> gitHubBuilder.withOAuthToken(builder.personalAccessToken); + case APP_INSTALLATION_TOKEN -> gitHubBuilder.withAppInstallationToken(builder.appInstallationToken); + } + + gitHub = gitHubBuilder.build(); + + final String fullRepoName = repoOwner + "/" + repoName; + try { + repository = gitHub.getRepository(fullRepoName); + } catch (final FileNotFoundException fnf) { + throw new FlowRegistryException("Repository [" + fullRepoName + "] not found"); + } + + // if anonymous then we assume the client has read permissions, otherwise the call to getRepository above would have failed + // if not anonymous then we get the identity of the current user and then ask for the permissions the current user has on the repo + if (gitHub.isAnonymous()) { + canRead = true; + canWrite = false; + } else { + final GHMyself currentUser = gitHub.getMyself(); + canRead = repository.hasPermission(currentUser, GHPermissionType.READ); + canWrite = repository.hasPermission(currentUser, GHPermissionType.WRITE); + } + } + + /** + * @return the repo owner + */ + public String getRepoOwner() { + return repoOwner; + } + + /** + * @return the repo name + */ + public String getRepoName() { + return repoName; + } + + /** + * @return the authentication type this client is configured with + */ + public GitHubAuthenticationType getAuthenticationType() { + return authenticationType; + } + + /** + * @return true if the repository is readable by configured credentials + */ + public boolean getCanRead() { + return canRead; + } + + /** + * @return true if the repository is writable by the configured credentials + */ + public boolean getCanWrite() { + return canWrite; + } + + /** + * Creates the content specified by the given builder. + * + * @param request the request for the content to create + * @return the update response + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public String createContent(final GitHubCreateContentRequest request) throws IOException, FlowRegistryException { + final String branch = request.getBranch(); + final String resolvedPath = getResolvedPath(request.getPath()); + LOGGER.debug("Creating content at path [{}] on branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName()); + return execute(() -> { + try { + final GHContentUpdateResponse response = repository.createContent() + .branch(branch) + .path(resolvedPath) + .content(request.getContent()) + .message(request.getMessage()) + .sha(request.getExistingContentSha()) + .commit(); + return response.getCommit().getSha(); + } catch (final FileNotFoundException fnf) { + throwPathOrBranchNotFound(fnf, resolvedPath, branch); + return null; + } + }); + } + + /** + * Gets the names of all the branches in the repo. + * + * @return the set of all branches in the repo + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public Set<String> getBranches() throws IOException, FlowRegistryException { + LOGGER.debug("Getting branches for repo [{}]", repository.getName()); + return execute(() -> repository.getBranches().keySet()); + } + + /** + * Gets an InputStream to read the latest content of the given path from the given branch. + * The returned stream already contains the contents of the requested file. + * + * @param path the path to the content + * @param branch the branch + * @return an input stream containing the contents of the path + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public InputStream getContentFromBranch(final String path, final String branch) throws IOException, FlowRegistryException { + final String resolvedPath = getResolvedPath(path); + final String branchRef = BRANCH_REF_PATTERN.formatted(branch); + LOGGER.debug("Getting content for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName()); + + return execute(() -> { + try { + final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef); + return ghContent.read(); + } catch (final FileNotFoundException fnf) { + throwPathOrBranchNotFound(fnf, resolvedPath, branchRef); + return null; + } + }); + } + + /** + * Gets the content of the given path from the given commit. + * The returned stream already contains the contents of the requested file. + * + * @param path the path to the content + * @param commitSha the commit SHA + * @return an input stream containing the contents of the path + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public InputStream getContentFromCommit(final String path, final String commitSha) throws IOException, FlowRegistryException { + final String resolvedPath = getResolvedPath(path); + LOGGER.debug("Getting content for [{}] from commit [{}] in repo [{}] ", resolvedPath, commitSha, repository.getName()); + + return execute(() -> { + try { + final GHContent ghContent = repository.getFileContent(resolvedPath, commitSha); + return ghContent.read(); + } catch (final FileNotFoundException fnf) { + throw new FlowRegistryException("Path [" + resolvedPath + "] or Commit [" + commitSha + "] not found", fnf); + } + }); + } + + /** + * Gets the commits for a given path on a given branch. + * + * @param path the path + * @param branch the branch + * @return the list of commits for the given path + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public List<GHCommit> getCommits(final String path, final String branch) throws IOException, FlowRegistryException { + final String resolvedPath = getResolvedPath(path); + final String branchRef = BRANCH_REF_PATTERN.formatted(branch); + LOGGER.debug("Getting commits for [{}] from branch [{}] in repo [{}]", resolvedPath, branch, repository.getName()); + + return execute(() -> { + try { + final GHRef branchGhRef = repository.getRef(branchRef); + return repository.queryCommits() + .path(resolvedPath) + .from(branchGhRef.getObject().getSha()) + .pageSize(COMMIT_PAGE_SIZE) + .list() + .toList(); + } catch (final FileNotFoundException fnf) { + throwPathOrBranchNotFound(fnf, resolvedPath, branchRef); + return null; + } + }); + } + + /** + * Gets the top-level directory names, which are the directories at the root of the repo, or within the prefix if specified. + * + * @param branch the branch + * @return the set of directory names + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public Set<String> getTopLevelDirectoryNames(final String branch) throws IOException, FlowRegistryException { + return getDirectoryItems("", branch, GHContent::isDirectory); + } + + /** + * Gets the names of the directories contained within the given directory. + * + * @param directory the directory to list + * @param branch the branch + * @return the set of directory names + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public Set<String> getDirectoryNames(final String directory, final String branch) throws IOException, FlowRegistryException { + return getDirectoryItems(directory, branch, GHContent::isDirectory); + } + + /** + * Gets the names of the directories container within the given directory. + * + * @param directory the directory to list + * @param branch the branch + * @return the set of file names + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public Set<String> getFileNames(final String directory, final String branch) throws IOException, FlowRegistryException { + return getDirectoryItems(directory, branch, GHContent::isFile); + } + + /** + * Get the names of the items in the given directory on the given branch, filtered by the provided filter. + * + * @param directory the directory + * @param branch the branch + * @param filter the filter to determine which items get included + * @return the set of item names + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + private Set<String> getDirectoryItems(final String directory, final String branch, final Predicate<GHContent> filter) throws IOException, FlowRegistryException { + final String resolvedDirectory = getResolvedPath(directory); + final String branchRef = BRANCH_REF_PATTERN.formatted(branch); + LOGGER.debug("Getting directory items for [{}] from branch [{}] in repo [{}] ", resolvedDirectory, branch, repository.getName()); + + return execute(() -> { + try { + return repository.getDirectoryContent(resolvedDirectory, branchRef).stream() + .filter(filter) + .map(GHContent::getName) + .collect(Collectors.toSet()); + } catch (final FileNotFoundException fnf) { + throwPathOrBranchNotFound(fnf, resolvedDirectory, branchRef); + return null; + } + }); + } + + /** + * Gets the current SHA for the given path from the given branch. + * + * @param path the path to the content + * @param branch the branch + * @return current sha for the given file, or empty optional + * + * @throws IOException if an I/O error happens calling GitHub + */ + public Optional<String> getContentSha(final String path, final String branch) throws IOException, FlowRegistryException { + final String resolvedPath = getResolvedPath(path); + final String branchRef = BRANCH_REF_PATTERN.formatted(branch); + LOGGER.debug("Getting content SHA for [{}] from branch [{}] in repo [{}] ", resolvedPath, branch, repository.getName()); + + return execute(() -> { + try { + final GHContent ghContent = repository.getFileContent(resolvedPath, branchRef); + return Optional.of(ghContent.getSha()); + } catch (final FileNotFoundException e) { + LOGGER.warn("Unable to get content SHA for [{}] from branch [{}] because content does not exist", resolvedPath, branch); + return Optional.empty(); + } + }); + } + + /** + * Deletes the contents for the given file on the given branch. + * + * @param filePath the file path to delete + * @param commitMessage the commit message for the delete commit + * @param branch the branch to delete from + * @return the deleted content + * + * @throws IOException if an I/O error happens calling GitHub + * @throws FlowRegistryException if a non I/O error happens calling GitHub + */ + public GHContent deleteContent(final String filePath, final String commitMessage, final String branch) throws FlowRegistryException, IOException { + final String resolvedPath = getResolvedPath(filePath); + LOGGER.debug("Deleting file [{}] in repo [{}] on branch [{}]", resolvedPath, repository.getName(), branch); + return execute(() -> { + try { + GHContent ghContent = repository.getFileContent(resolvedPath); + ghContent.delete(commitMessage, branch); + return ghContent; + } catch (final FileNotFoundException fnf) { + throwPathOrBranchNotFound(fnf, resolvedPath, branch); + return null; + } + }); + } + + private String getResolvedPath(final String path) { + return repoPath == null ? path : repoPath + "/" + path; + } + + private void throwPathOrBranchNotFound(final FileNotFoundException fileNotFoundException, final String path, final String branch) throws FlowRegistryException { + throw new FlowRegistryException("Path [" + path + "] or Branch [" + branch + "] not found", fileNotFoundException); + } + + private <T> T execute(final GHRequest<T> action) throws FlowRegistryException, IOException { + try { + return action.execute(); + } catch (final FlowRegistryException | IOException e) { + throw e; + } catch (final Exception e) { + throw new FlowRegistryException(e.getMessage(), e); + } + } + + /** + * Functional interface for making a request to GitHub which may throw IOException. + * + * @param <T> the result of the request + */ + private interface GHRequest<T> { + + T execute() throws IOException, FlowRegistryException; + + } + + /** + * @return a new builder + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for the repository client. + */ + public static class Builder { + + private String apiUrl; + private GitHubAuthenticationType authenticationType; + private String personalAccessToken; + private String appInstallationToken; + private String repoOwner; + private String repoName; + private String repoPath; + + public Builder apiUrl(final String apiUrl) { + this.apiUrl = apiUrl; + return this; + } + + public Builder authenticationType(final GitHubAuthenticationType authenticationType) { + this.authenticationType = authenticationType; + return this; + } + + public Builder personalAccessToken(final String personalAccessToken) { + this.personalAccessToken = personalAccessToken; + return this; + } + + public Builder appInstallationToken(final String appInstallationToken) { + this.appInstallationToken = appInstallationToken; + return this; + } + + public Builder repoOwner(final String repoOwner) { + this.repoOwner = repoOwner; + return this; + } + + public Builder repoName(final String repoName) { + this.repoName = repoName; + return this; + } + + public Builder repoPath(final String repoPath) { + this.repoPath = repoPath; + return this; + } + + public GitHubRepositoryClient build() throws IOException, FlowRegistryException { + return new GitHubRepositoryClient(this); + } + + } +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java new file mode 100644 index 0000000000..b21b9f7732 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/JacksonFlowSnapshotSerializer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.github; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.module.jakarta.xmlbind.JakartaXmlBindAnnotationIntrospector; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementation of {@link FlowSnapshotSerializer} that is Jackson's ObjectMapper. + */ +public class JacksonFlowSnapshotSerializer implements FlowSnapshotSerializer { + + private static final ObjectMapper OBJECT_MAPPER = JsonMapper.builder() + .serializationInclusion(JsonInclude.Include.NON_NULL) + .defaultPropertyInclusion(JsonInclude.Value.construct(JsonInclude.Include.NON_NULL, JsonInclude.Include.NON_NULL)) + .annotationIntrospector(new JakartaXmlBindAnnotationIntrospector(TypeFactory.defaultInstance())) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) + .enable(SerializationFeature.INDENT_OUTPUT) + .addModule(new VersionedComponentModule()) + .build(); + + @Override + public String serialize(final RegisteredFlowSnapshot flowSnapshot) throws IOException { + return OBJECT_MAPPER.writeValueAsString(flowSnapshot); + } + + @Override + public RegisteredFlowSnapshot deserialize(final InputStream inputStream) throws IOException { + return OBJECT_MAPPER.readValue(inputStream, RegisteredFlowSnapshot.class); + } + +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java new file mode 100644 index 0000000000..813c6bad01 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/java/org/apache/nifi/github/VersionedComponentModule.java @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.nifi.github; + +import com.fasterxml.jackson.databind.BeanDescription; +import com.fasterxml.jackson.databind.SerializationConfig; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.databind.ser.BeanPropertyWriter; +import com.fasterxml.jackson.databind.ser.BeanSerializerModifier; +import org.apache.nifi.flow.ConnectableComponent; +import org.apache.nifi.flow.VersionedComponent; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +/** + * Jackson Module to customize serialization of versioned component objects. + */ +public class VersionedComponentModule extends SimpleModule { + + private static final Set<String> EXCLUDE_JSON_FIELDS = Set.of("instanceIdentifier", "instanceGroupId"); + + @Override + public void setupModule(final SetupContext context) { + super.setupModule(context); + context.addBeanSerializerModifier(new VersionedComponentBeanSerializerModifier()); + } + + private static class VersionedComponentBeanSerializerModifier extends BeanSerializerModifier { + @Override + public List<BeanPropertyWriter> changeProperties(final SerializationConfig config, final BeanDescription beanDesc, final List<BeanPropertyWriter> beanProperties) { + if (!VersionedComponent.class.isAssignableFrom(beanDesc.getBeanClass()) + && !ConnectableComponent.class.isAssignableFrom(beanDesc.getBeanClass())) { + return super.changeProperties(config, beanDesc, beanProperties); + } + + final List<BeanPropertyWriter> includedProperties = new ArrayList<>(); + for (final BeanPropertyWriter property : beanProperties) { + if (!EXCLUDE_JSON_FIELDS.contains(property.getName())) { + includedProperties.add(property); + } + } + return includedProperties; + } + } +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient new file mode 100644 index 0000000000..bd467f5e24 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowRegistryClient @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.github.GitHubFlowRegistryClient \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java new file mode 100644 index 0000000000..0b09a9671b --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-extensions/src/test/java/org/apache/nifi/github/GitHubFlowRegistryClientTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.github; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.registry.flow.FlowRegistryBucket; +import org.apache.nifi.registry.flow.FlowRegistryClientConfigurationContext; +import org.apache.nifi.registry.flow.FlowRegistryClientInitializationContext; +import org.apache.nifi.registry.flow.FlowRegistryException; +import org.apache.nifi.registry.flow.RegisterAction; +import org.apache.nifi.registry.flow.RegisteredFlow; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshot; +import org.apache.nifi.registry.flow.RegisteredFlowSnapshotMetadata; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class GitHubFlowRegistryClientTest { + + static final String DEFAULT_REPO_PATH = "some-path"; + static final String DEFAULT_REPO_BRANCH = "some-branch"; + + private GitHubRepositoryClient repositoryClient; + private FlowSnapshotSerializer flowSnapshotSerializer; + private GitHubFlowRegistryClient flowRegistryClient; + private FlowRegistryClientConfigurationContext clientConfigurationContext; + private ComponentLog componentLog; + + @BeforeEach + public void setup() throws IOException, FlowRegistryException { + repositoryClient = mock(GitHubRepositoryClient.class); + flowSnapshotSerializer = mock(FlowSnapshotSerializer.class); + flowRegistryClient = new TestableGitHubRepositoryClient(repositoryClient, flowSnapshotSerializer); + + clientConfigurationContext = mock(FlowRegistryClientConfigurationContext.class); + componentLog = mock(ComponentLog.class); + + final FlowRegistryClientInitializationContext initializationContext = mock(FlowRegistryClientInitializationContext.class); + when(initializationContext.getLogger()).thenReturn(componentLog); + flowRegistryClient.initialize(initializationContext); + + when(repositoryClient.getCanRead()).thenReturn(true); + when(repositoryClient.getCanWrite()).thenReturn(true); + when(repositoryClient.getTopLevelDirectoryNames(anyString())).thenReturn(Set.of("existing-bucket")); + } + + @Test + public void testRegisterFlow() throws IOException, FlowRegistryException { + setupClientConfigurationContextWithDefaults(); + + final String serializedSnapshotContent = "placeholder"; + when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent); + + final RegisteredFlow incomingFlow = createIncomingRegisteredFlow(); + final RegisteredFlow resultFlow = flowRegistryClient.registerFlow(clientConfigurationContext, incomingFlow); + assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier()); + assertEquals(incomingFlow.getName(), resultFlow.getName()); + assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier()); + assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName()); + assertEquals(incomingFlow.getBranch(), resultFlow.getBranch()); + + final ArgumentCaptor<GitHubCreateContentRequest> argumentCaptor = ArgumentCaptor.forClass(GitHubCreateContentRequest.class); + verify(repositoryClient).createContent(argumentCaptor.capture()); + + final GitHubCreateContentRequest capturedArgument = argumentCaptor.getValue(); + assertEquals(incomingFlow.getBranch(), capturedArgument.getBranch()); + assertEquals(GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier()), capturedArgument.getPath()); + assertEquals(serializedSnapshotContent, capturedArgument.getContent()); + assertNull(capturedArgument.getExistingContentSha()); + } + + @Test + public void testRegisterFlowSnapshot() throws IOException, FlowRegistryException { + setupClientConfigurationContextWithDefaults(); + + final RegisteredFlow incomingFlow = createIncomingRegisteredFlow(); + + final RegisteredFlowSnapshotMetadata incomingMetadata = new RegisteredFlowSnapshotMetadata(); + incomingMetadata.setBranch(incomingFlow.getBranch()); + incomingMetadata.setBucketIdentifier(incomingFlow.getBucketIdentifier()); + incomingMetadata.setFlowIdentifier(incomingFlow.getIdentifier()); + incomingMetadata.setComments("Unit test"); + + final RegisteredFlowSnapshot incomingSnapshot = new RegisteredFlowSnapshot(); + incomingSnapshot.setFlow(incomingFlow); + incomingSnapshot.setSnapshotMetadata(incomingMetadata); + incomingSnapshot.setFlowContents(new VersionedProcessGroup()); + + final String snapshotFilePath = GitHubFlowRegistryClient.SNAPSHOT_FILE_PATH_FORMAT.formatted(incomingFlow.getBucketIdentifier(), incomingFlow.getIdentifier()); + when(repositoryClient.getContentFromBranch(snapshotFilePath, DEFAULT_REPO_BRANCH)).thenReturn(new ByteArrayInputStream(new byte[0])); + when(flowSnapshotSerializer.deserialize(any(InputStream.class))).thenReturn(incomingSnapshot); + + final String serializedSnapshotContent = "placeholder"; + when(flowSnapshotSerializer.serialize(any(RegisteredFlowSnapshot.class))).thenReturn(serializedSnapshotContent); + + final String commitSha = "commitSha"; + when(repositoryClient.createContent(any(GitHubCreateContentRequest.class))).thenReturn(commitSha); + + final RegisteredFlowSnapshot resultSnapshot = flowRegistryClient.registerFlowSnapshot(clientConfigurationContext, incomingSnapshot, RegisterAction.COMMIT); + assertNotNull(resultSnapshot); + + final RegisteredFlow resultFlow = resultSnapshot.getFlow(); + assertNotNull(resultFlow); + assertEquals(incomingFlow.getIdentifier(), resultFlow.getIdentifier()); + assertEquals(incomingFlow.getName(), resultFlow.getName()); + assertEquals(incomingFlow.getBucketIdentifier(), resultFlow.getBucketIdentifier()); + assertEquals(incomingFlow.getBucketName(), resultFlow.getBucketName()); + assertEquals(incomingFlow.getBranch(), resultFlow.getBranch()); + + final RegisteredFlowSnapshotMetadata resultMetadata = resultSnapshot.getSnapshotMetadata(); + assertNotNull(resultMetadata); + assertEquals(incomingMetadata.getBranch(), resultMetadata.getBranch()); + assertEquals(incomingMetadata.getBucketIdentifier(), resultMetadata.getBucketIdentifier()); + assertEquals(incomingMetadata.getFlowIdentifier(), resultMetadata.getFlowIdentifier()); + assertEquals(incomingMetadata.getComments(), resultMetadata.getComments()); + + final FlowRegistryBucket resultBucket = resultSnapshot.getBucket(); + assertNotNull(resultBucket); + assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getIdentifier()); + assertEquals(incomingMetadata.getBucketIdentifier(), resultBucket.getName()); + } + + private RegisteredFlow createIncomingRegisteredFlow() { + final RegisteredFlow incomingFlow = new RegisteredFlow(); + incomingFlow.setIdentifier("Flow1"); + incomingFlow.setName("Flow1"); + incomingFlow.setBucketIdentifier("Bucket1"); + incomingFlow.setBucketName("Bucket1"); + incomingFlow.setBranch(DEFAULT_REPO_BRANCH); + return incomingFlow; + } + + private void setupClientConfigurationContext(final String repoPath, final String branch) { + final PropertyValue repoPathPropertyValue = createMockPropertyValue(repoPath); + when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_PATH)).thenReturn(repoPathPropertyValue); + + final PropertyValue branchPropertyValue = createMockPropertyValue(branch); + when(clientConfigurationContext.getProperty(GitHubFlowRegistryClient.REPOSITORY_BRANCH)).thenReturn(branchPropertyValue); + } + + private void setupClientConfigurationContextWithDefaults() { + setupClientConfigurationContext(DEFAULT_REPO_PATH, DEFAULT_REPO_BRANCH); + } + + private PropertyValue createMockPropertyValue(final String value) { + final PropertyValue propertyValue = mock(PropertyValue.class); + when(propertyValue.getValue()).thenReturn(value); + return propertyValue; + } + + private static class TestableGitHubRepositoryClient extends GitHubFlowRegistryClient { + private final GitHubRepositoryClient repositoryClient; + private final FlowSnapshotSerializer flowSnapshotSerializer; + + public TestableGitHubRepositoryClient(final GitHubRepositoryClient repositoryClient, final FlowSnapshotSerializer flowSnapshotSerializer) { + this.repositoryClient = repositoryClient; + this.flowSnapshotSerializer = flowSnapshotSerializer; + } + + @Override + protected GitHubRepositoryClient createRepositoryClient(final FlowRegistryClientConfigurationContext context) throws IOException, FlowRegistryException { + return repositoryClient; + } + + @Override + protected FlowSnapshotSerializer createFlowSnapshotSerializer(final FlowRegistryClientInitializationContext initializationContext) { + return flowSnapshotSerializer; + } + } + +} diff --git a/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml new file mode 100644 index 0000000000..25b10f588d --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/nifi-github-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-github-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-github-nar</artifactId> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-github-extensions</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-shared-nar</artifactId> + <version>2.0.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + </dependencies> +</project> + diff --git a/nifi-extension-bundles/nifi-github-bundle/pom.xml b/nifi-extension-bundles/nifi-github-bundle/pom.xml new file mode 100644 index 0000000000..d33cf43641 --- /dev/null +++ b/nifi-extension-bundles/nifi-github-bundle/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>nifi-standard-shared-bom</artifactId> + <groupId>org.apache.nifi</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../nifi-standard-shared-bundle/nifi-standard-shared-bom</relativePath> + </parent> + + <artifactId>nifi-github-bundle</artifactId> + <packaging>pom</packaging> + + <properties> + <github-api.version>1.321</github-api.version> + </properties> + + <modules> + <module>nifi-github-extensions</module> + <module>nifi-github-nar</module> + </modules> +</project> diff --git a/nifi-extension-bundles/pom.xml b/nifi-extension-bundles/pom.xml index 192bccb20c..c999eec3ac 100755 --- a/nifi-extension-bundles/pom.xml +++ b/nifi-extension-bundles/pom.xml @@ -104,5 +104,6 @@ <module>nifi-jolt-bundle</module> <module>nifi-questdb-bundle</module> <module>nifi-protobuf-bundle</module> + <module>nifi-github-bundle</module> </modules> </project> diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java index a099a15e62..62d6d0f206 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/FlowAnalyzingRegistryClientNode.java @@ -461,6 +461,11 @@ public final class FlowAnalyzingRegistryClientNode implements FlowRegistryClient return node.getLatestVersion(context, flowLocation); } + @Override + public String generateFlowId(final String flowName) throws IOException, FlowRegistryException { + return node.generateFlowId(flowName); + } + @Override public void setComponent(final LoggableComponent<FlowRegistryClient> component) { node.setComponent(component); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java index 0fbe4a3119..4fed6f6452 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/StandardFlowRegistryClientNode.java @@ -284,6 +284,11 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode return execute(() -> client.get().getComponent().getLatestVersion(getConfigurationContext(context), flowLocation)); } + @Override + public String generateFlowId(final String flowName) throws IOException, FlowRegistryException { + return execute(() -> client.get().getComponent().generateFlowId(flowName)); + } + @Override public void setComponent(final LoggableComponent<FlowRegistryClient> component) { client.set(component); @@ -299,7 +304,7 @@ public final class StandardFlowRegistryClientNode extends AbstractComponentNode } final List<String> validationProblems = validationResults.stream() - .map(e -> e.getExplanation()) + .map(ValidationResult::getExplanation) .collect(Collectors.toList()); throw new FlowRegistryInvalidException(validationProblems); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java index bc4b584490..e58457e387 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClientNode.java @@ -63,5 +63,7 @@ public interface FlowRegistryClientNode extends ComponentNode { Set<RegisteredFlowSnapshotMetadata> getFlowVersions(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException; Optional<String> getLatestVersion(FlowRegistryClientUserContext context, FlowLocation flowLocation) throws FlowRegistryException, IOException; + String generateFlowId(String flowName) throws IOException, FlowRegistryException; + void setComponent(LoggableComponent<FlowRegistryClient> component); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index c88dc8818e..0244136ba4 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -394,7 +394,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -3276,7 +3275,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final FlowRegistryBranch defaultBranch = flowRegistryDAO.getDefaultBranchForUser(clientUserContext, registryClientId); return flowRegistryDAO.getFlowVersionsForUser(clientUserContext, registryClientId, defaultBranch.getName(), bucketId, flowId).stream() .map(md -> createVersionedFlowSnapshotMetadataEntity(registryClientId, md)) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(LinkedHashSet::new)); } @Override @@ -5021,7 +5020,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { final Map<String, ParameterProviderReference> parameterProviderReferences = new HashMap<>(); final Map<String, VersionedParameterContext> parameterContexts = createVersionedParameterContexts(processGroup, parameterProviderReferences); - final String flowId = versionedFlowDto.getFlowId() == null ? UUID.randomUUID().toString() : versionedFlowDto.getFlowId(); final String registryId = requestEntity.getVersionedFlow().getRegistryId(); final String selectedBranch; @@ -5040,7 +5038,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { versionedFlow.setDescription(versionedFlowDto.getDescription()); versionedFlow.setLastModifiedTimestamp(versionedFlow.getCreatedTimestamp()); versionedFlow.setName(versionedFlowDto.getFlowName()); - versionedFlow.setIdentifier(flowId); + versionedFlow.setIdentifier(versionedFlowDto.getFlowId()); // Add the Versioned Flow and first snapshot to the Flow Registry final RegisteredFlowSnapshot registeredSnapshot; @@ -5069,7 +5067,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { // then we need to capture the created versioned flow information as a partial successful result. if (registerNewFlow) { try { - final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), flowId); + final FlowLocation flowLocation = new FlowLocation(selectedBranch, versionedFlowDto.getBucketId(), registeredFlow.getIdentifier()); final FlowRegistryClientNode flowRegistryClientNode = flowRegistryDAO.getFlowRegistryClient(registryId); flowRegistryClientNode.deregisterFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flowLocation); } catch (final IOException | FlowRegistryException e2) { @@ -5357,6 +5355,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } try { + final String generatedId = registry.generateFlowId(flow.getName()); + flow.setIdentifier(generatedId); return registry.registerFlow(FlowRegistryClientContextFactory.getContextForUser(NiFiUserUtils.getNiFiUser()), flow); } catch (final IOException | FlowRegistryException e) { throw new NiFiCoreException("Failed to register flow with Flow Registry due to " + e.getMessage(), e); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java index 352c9767db..a6d04a165c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java @@ -1247,7 +1247,7 @@ public class VersionsResource extends FlowUpdateResource<VersionControlInformati final RegisteredFlowSnapshotMetadata metadata = flowSnapshot.getSnapshotMetadata(); final VersionControlInformationDTO versionControlInfo = new VersionControlInformationDTO(); - versionControlInfo.setBranch(requestVci.getBranch()); + versionControlInfo.setBranch(metadata.getBranch()); versionControlInfo.setBucketId(metadata.getBucketIdentifier()); versionControlInfo.setBucketName(bucket.getName()); versionControlInfo.setFlowDescription(flow.getDescription()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java index 5f4dba8e6d..430f90265f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardFlowRegistryDAO.java @@ -198,7 +198,12 @@ public class StandardFlowRegistryDAO extends ComponentDAO implements FlowRegistr final FlowLocation flowLocation = new FlowLocation(branch, bucketId, flowId); final Set<RegisteredFlowSnapshotMetadata> flowVersions = flowRegistry.getFlowVersions(context, flowLocation); - final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>(Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp)); + + // if somehow the timestamp of two versions is exactly the same, then we use version as a secondary comparison, + // otherwise one of the objects won't be added to the set since compareTo returns 0 indicating it already exists + final Set<RegisteredFlowSnapshotMetadata> sortedFlowVersions = new TreeSet<>( + Comparator.comparingLong(RegisteredFlowSnapshotMetadata::getTimestamp) + .thenComparing(RegisteredFlowSnapshotMetadata::getVersion)); sortedFlowVersions.addAll(flowVersions); return sortedFlowVersions; } catch (final IOException | FlowRegistryException ioe) {